Deploying models in Azure Machine Learning

Rating & reviews (0 reviews)
Study Notes

Deploy process - pack all in an image - that image is deployed to a container on chosen target.

Model can be deployed:
  • Local compute
    Test/Development
  • Azure Machine Learning compute instance
    Test/Development
  • Azure Container Instance (ACI)
    Test/Development
  • Azure Kubernetes Service (AKS) cluster
  • Azure Function,
  • Internet of Things (IoT) module
Deployment type:
  • Real time service
  • Batch inference pipeline
Process -Real time service
  1. Register a trained model
  2. Define an inference configuration.
    Create entry / scoring script.
    Create environment.
    Combine the script and environment in an InferenceConfig
  3. Define a deployment configuration.
    Set (create) compute target.
  4. Deploy the model.
  5. Consume a real-time inferencing service
Process -Batch inference pipeline
  1. Register a trained model
  2. Create a scoring script
  3. Create a pipeline with a ParallelRunStep
  4. Run the pipeline and retrieve the step output
  5. Publishing a batch inference pipeline

1. Register trained model
from azureml.core import Model

classification_model = Model.register(workspace=ws,
...)
Or
run.register_model( model_name='classification_model',
...)

#
# Deploy as Real time service
#


2. Create an entry script (scoring script)
It must include two functions:
  • init()
    Called when the service is initialized.
    load the model from the model registry
  • run(raw_data)
    Called when new data is submitted to the service.
    generate predictions from the input data
import json
import joblib
import numpy as np
import os

# Called when the service is loaded
def init():
global model
# Get the path to the registered model file and load it
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pkl')
model = joblib.load(model_path)

# Called when a request is received
def run(raw_data):
# Get the input data as a numpy array
data = np.array(json.loads(raw_data)['data'])
# Get a prediction from the model
predictions = model.predict(data)
# Return the predictions as any JSON serializable format
return predictions.tolist()

3. Create an environment
from azureml.core import Environment

service_env = Environment(name='service-env')
python_packages = ['scikit-learn', 'numpy']
for package in python_packages:
service_env.python.conda_dependencies.add_pip_package(package)

Combine the script and environment in an InferenceConfig
from azureml.core.model import InferenceConfig

classifier_inference_config = InferenceConfig(source_directory = 'service_files',
entry_script="score.py",
environment=service_env)

Set (create) compute target (Define a deployment configuration)
from azureml.core.compute import ComputeTarget, AksCompute

cluster_name = 'aks-cluster'
compute_config = AksCompute.provisioning_configuration(location='eastus')
production_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
production_cluster.wait_for_completion(show_output=True)

and
from azureml.core.webservice import AksWebservice

classifier_deploy_config = AksWebservice.deploy_configuration(cpu_cores = 1,
memory_gb = 1)

4. Deploy the model
from azureml.core.model import Model

model = ws.models['classification_model']
service = Model.deploy(workspace=ws,
name = 'classifier-service',
models = [model],
inference_config = classifier_inference_config,
deployment_config = classifier_deploy_config,
deployment_target = production_cluster) # NOT necessary or set to none for ACI

service.wait_for_deployment(show_output = True)

Once finished you find it under Endpoints


5. Consume a real-time inferencing service
Common scenario, calls a service and displays the response.
Testing purposes.
import json

# An array of new data cases
x_new = [[0.1,2.3,4.1,2.0],
[0.2,1.8,3.9,2.1]]

# Convert the array to a serializable list in a JSON document
json_data = json.dumps({"data": x_new})

# Call the web service, passing the input data
response = service.run(input_data = json_data)

# Get the predictions
predictions = json.loads(response)

# Print the predicted class for each case.
for i in range(len(x_new)):
print (x_new[i], predictions[i])

For production - REST interface.
Get the REST end point.
endpoint = service.scoring_uri
print(endpoint)

Pass data via POST to the end point.
import requests
import json

# An array of new data cases
x_new = [[0.1,2.3,4.1,2.0],
[0.2,1.8,3.9,2.1]]

# Convert the array to a serializable list in a JSON document
json_data = json.dumps({"data": x_new})

# Set the content type in the request headers
request_headers = { 'Content-Type':'application/json' }

# Call the service
response = requests.post(url = endpoint,
data = json_data,
headers = request_headers)

# Get the predictions from the JSON response
predictions = json.loads(response.json())

# Print the predicted class for each case.
for i in range(len(x_new)):
print (x_new[i]), predictions[i] )

Authentication
There are two kinds of authentication you can use:
  • Key
    Requests are authenticated by specifying the key associated with the service.
  • Token
    Requests are authenticated by providing a JSON Web Token (JWT).
Default authentication:
  • ACI- disabled - does not support token, time limited (service principal via Azure ADl)
  • AKS- key based (primary and secondary keys are automatically generated), can be set with token.
To make an authenticated call to the service's REST endpoint, you must include the key or token in the request header
import requests
import json

# An array of new data cases
x_new = [[0.1,2.3,4.1,2.0],
[0.2,1.8,3.9,2.1]]

# Convert the array to a serializable list in a JSON document
json_data = json.dumps({"data": x_new})

# Set the content type in the request headers
request_headers = { "Content-Type":"application/json",
"Authorization":"Bearer " + key_or_token}

# Call the service
response = requests.post(url = endpoint,
data = json_data,
headers = request_headers)

# Get the predictions from the JSON response
predictions = json.loads(response.json())

# Print the predicted class for each case.
for i in range(len(x_new)):
print (x_new[i]), predictions[i] )

Troubleshoot service deployment
  1. Check the service state
    from azureml.core.webservice import AksWebservice

    # Get the deployed service
    service = AksWebservice(name='classifier-service', workspace=ws)

    # Check its state
    print(service.state)
  2. Review service logs
    print(service.get_logs())
  3. Deploy to a local container
    from azureml.core.webservice import LocalWebservice

    deployment_config = LocalWebservice.deploy_configuration(port=8890)
    service = Model.deploy(ws, 'test-svc', [model], inference_config, deployment_config)

    then,
    print(service.run(input_data = json_data))
    You can then troubleshoot runtime issues by making changes to the scoring file that is referenced in the inference configuration, and reloading the service without redeploying it (something you can only do with a local service):
    service.reload()
    print(service.run(input_data = json_data))

#
# Deploy as Batch inference pipelines
#


2. Create a scoring script
Load the model and use it to predict new values. It must include two functions:
  1. init()
    Called when the pipeline is initialized.
    Same like for real time service
  2. run(mini_batch)
    Called for each batch of data to be processed.
import os
import numpy as np
from azureml.core import Model
import joblib

def init():
# Runs when the pipeline step is initialized
global model

# load the model
model_path = Model.get_model_path('classification_model')
model = joblib.load(model_path)

def run(mini_batch):
# This runs for each batch
resultList = []

# process each file in the batch
for f in mini_batch:
# Read comma-delimited data into an array
data = np.genfromtxt(f, delimiter=',')
# Reshape into a 2-dimensional array for model input
prediction = model.predict(data.reshape(1, -1))
# Append prediction to results
resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
return resultList

3. Create a pipeline with a ParallelRunStep
ParallelRunStep class allows read batches of files from a File dataset and write the processing output to a OutputFileDatasetConfig
All instances of the step being run in parallel will collate their results to a single output file named parallel_run_step.txt

from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import Pipeline

# Get the batch dataset for input
batch_data_set = ws.datasets['batch-data']

# Set the output location
output_dir = OutputFileDatasetConfig(name='inferences')

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
source_directory='batch_scripts',
entry_script="batch_scoring_script.py",
mini_batch_size="5",
error_threshold=10,
output_action="append_row",
environment=batch_env,
compute_target=aml_cluster,
node_count=4)

# Create the parallel run step
parallelrun_step = ParallelRunStep(
name='batch-score',
parallel_run_config=parallel_run_config,
inputs=[batch_data_set.as_named_input('batch_data')],
output=output_dir,
arguments=[],
allow_reuse=True
)
# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

4. Run the pipeline and retrieve the step output
from azureml.core import Experiment

# Run the pipeline as an experiment
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

# Get the outputs from the first (and only) step
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='results')

# Find the parallel_run_step.txt file
for root, dirs, files in os.walk('results'):
for file in files:
if file.endswith('parallel_run_step.txt'):
result_file = os.path.join(root,file)

# Load and display the results
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]
print(df)

5. Publishing a batch inference pipeline
As REST service
published_pipeline = pipeline_run.publish_pipeline(name='Batch_Prediction_Pipeline',
description='Batch pipeline',
version='1.0')
rest_endpoint = published_pipeline.endpoint
Use it:
import requests

response = requests.post(rest_endpoint,
headers=auth_header,
json={"ExperimentName": "Batch_Prediction"})
run_id = response.json()["Id"]
or schedule
from azureml.pipeline.core import ScheduleRecurrence, Schedule

weekly = ScheduleRecurrence(frequency='Week', interval=1)
pipeline_schedule = Schedule.create(ws, name='Weekly Predictions',
description='batch inferencing',
pipeline_id=published_pipeline.id,
experiment_name='Batch_Prediction',
recurrence=weekly)


References:
Deploy a model as a real-time service - Training | Microsoft Learn
Creating a batch inference pipeline - Training | Microsoft Learn