Building Azure ML Pipelines using the Azure Machine Learning SDK

The Azure Machine Learning SDK allows data scientists and AI developers to interact with the Azure Machine Learning services within any Python environment. This provides many benefits, such as managing datasets, training models using cloud resources, and deploying trained models as web services.

In this article, you will follow the process of using the Azure ML SDK to build a pipeline for training and modeling.

Log in to Workspace

To log in to the workspace with the Azure ML Python SDK, you will need to authenticate again with Azure. When you run this cell for the first time, you are prompted to authenticate with Azure by clicking on a link and inputting a security code into a web page.

This block of code imports the azureml.core package which is used for interacting with Azure Machine Learning:

import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Default Datastore

Datastores enable Azure ML users to connect data in almost any Azure Storage service to their Azure ML Workspace. The datastore becomes an abstraction layer for connecting to the various types of Azure storage. This lab uses the default datastore attached to a storage account created by default when provisioning the Azure ML Workspace:

# Set Default Datastore
datastore = ws.get_default_datastore()

print('The default datastore has been saved to a variable.')

Select Compute

A compute cluster has already been created at the beginning of this lab. This cluster will be used for processing the tasks during each pipeline step:

# Select the compute cluster target
from azureml.core.compute import ComputeTarget

cpu_cluster = ComputeTarget(workspace=ws, name='automl-compute')

print("Found compute cluster!")

Building an Azure ML Pipeline

Azure ML Pipelines split up machine learning workflows into different steps. This workflow allows multiple users to collaborate on a single machine-learning workflow by making changes to just one step. It can also save costs by using cheaper computing resources for different steps.

In this example, you will break up the traditional workflow for training a model and build a pipeline with the following steps for training an Iris classification model:

  • Ingest Iris data from a URL
  • Preprocess Iris data and split it into test and training samples
  • Train the model using the preprocessed data
  • Evaluate the model and determine the accuracy
  • Deploy the model as a web service

Splitting up the machine learning workflow into different pipeline steps allows the workflow to scale with more massive datasets during the model's lifecycle. It also allows for multiple members of a team to manage separate parts of the workflow.

Creating the Source Directories

Each ML pipeline step will have its own Python script to execute to perform the desired actions. The location for each script file and any files it depends on is called a source directory. It's best practice to use separate folders for each source directory because a snapshot is taken of the source directory for each step. Using a different source directory for each pipeline step reduces the size of each snapshot. Any changes made to the files in each step's source directory can trigger a re-upload of the snapshot, causing that step to be rerun.

The source directory folder structure will look like this:

data_dependency_run_ingest
  └ ingest.py
data_dependency_run_preprocess
  └ preprocess.py
data_dependency_run_train
  └ train.py
data_dependency_run_evaluate
  └ evaluate.py
data_dependency_run_deploy
  └ score.py
  └ deploy.py

Run the cell block below to create the directories:

import os

# Create the source directory for each pipeline step
source_directory_ingest = 'data_dependency_run_ingest'
source_directory_preprocess = 'data_dependency_run_preprocess'
source_directory_train = 'data_dependency_run_train'
source_directory_evaluate = 'data_dependency_run_evaluate'
source_directory_deploy = 'data_dependency_run_deploy'


if not os.path.exists(source_directory_ingest):
    os.makedirs(source_directory_ingest)
if not os.path.exists(source_directory_preprocess):
    os.makedirs(source_directory_preprocess)
if not os.path.exists(source_directory_train ):
    os.makedirs(source_directory_train)
if not os.path.exists(source_directory_evaluate):
    os.makedirs(source_directory_evaluate)
if not os.path.exists(source_directory_deploy):
    os.makedirs(source_directory_deploy)
    
print('The source directories have been created.')

Creating Scripts in the Source Directories

Each pipeline step's scripts will need to be created and placed in their respective source directory folder. Read the summary of each script and run the cell block to make each script.

Each script contains arguments that are used to pass in the directory information between each step.

Ingest Script

The ingestion step takes input for a URL and a directory to store the data. It downloads the data from the URL and saves it to a folder on the datastore:

%%writefile $source_directory_ingest/ingest.py

import os
import urllib.request
import argparse

# Define arguments
parser = argparse.ArgumentParser(description='Iris Data Ingestion')
parser.add_argument('--iris_data_dir', type=str, help='Directory to store Iris Data')
parser.add_argument('--urls', type=str, help='Data URL to ingest')
args = parser.parse_args()


# Get arguments from parser
iris_data_dir = args.iris_data_dir
urls = args.urls


if not os.path.exists(iris_data_dir):
    os.makedirs(iris_data_dir)


# Download data from URL
print("Downloading data from URL Arguments")
urllib.request.urlretrieve(urls, "{}/iris.csv".format(iris_data_dir))

Preprocess Script

After the data is ingested and stored in a directory on the datastore, the preprocessing step takes the iris data from the previous step and splits the data into separate train and test sets. This is the typical pattern for training a machine learning model. The train and test sets are then stored in separate folders on the datastore:

%%writefile $source_directory_preprocess/preprocess.py

import pandas as pd
from sklearn.model_selection import train_test_split
import glob
import os
import argparse
import pickle

# Define arguments
parser = argparse.ArgumentParser(description='Preprocessing')
parser.add_argument('--train_dir', type=str, help='Directory to output the processed training data')
parser.add_argument('--iris_data_dir', type=str, help='Directory to store iris data')
parser.add_argument('--test_dir', type=str, help='Directory to output the processed test data')


args = parser.parse_args()

# Get arguments from parser
iris_data_dir = args.iris_data_dir
train_dir = args.train_dir
test_dir = args.test_dir


# Process data and split into train and test models
path = iris_data_dir
all_files = glob.glob(os.path.join(path, "*.csv"))

names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
dataset = pd.concat((pd.read_csv(f, names=names) for f in all_files))

array = dataset.values
X = array[:,0:4]
y = array[:,4]
X_train, X_test, Y_train, Y_test = train_test_split(X, y, test_size=0.20, random_state=1)

# Make train and test directories if they don't exist
if not os.path.exists(train_dir):
    os.makedirs(train_dir)

if not os.path.exists(test_dir):
    os.makedirs(test_dir)

# Output processed data to their respective folders
with open(test_dir + '/X_test.sav', 'wb') as f:
    pickle.dump(X_test, f)
with open(test_dir + '/Y_test.sav', 'wb') as f:
    pickle.dump(Y_test, f)
with open(train_dir + '/X_train.sav', 'wb') as f:
    pickle.dump(X_train, f)
with open(train_dir + '/Y_train.sav', 'wb') as f:
    pickle.dump(Y_train, f)

Train Script

The training step takes the preprocessed data and trains the model to fit the dataset. The training script saves the model to a directory:

%%writefile $source_directory_train/train.py

import os
from sklearn.svm import SVC
import pickle
import argparse


# Define arguments
parser = argparse.ArgumentParser(description='Train')
parser.add_argument('--train_dir', type=str, help='Directory to output the processed training data')
parser.add_argument('--output_dir', type=str, help='Directory to store output raw data')

args = parser.parse_args()

# Get arguments from parser
output_dir = args.output_dir
train_dir = args.train_dir

if not os.path.exists(output_dir):
    os.makedirs(output_dir)


# load the model from the training directory
loaded_X_train = pickle.load(open(train_dir + '/X_train.sav', 'rb'))
loaded_Y_train = pickle.load(open(train_dir + '/Y_train.sav', 'rb'))

# Fit the model with training dataset
model = SVC(gamma='auto')
model.fit(loaded_X_train, loaded_Y_train)

# Output model to directory
with open(output_dir + '/model.pt', 'wb') as f:
    pickle.dump(model, f)

Evaluation Script

The next step is to evaluate the model and determine its accuracy. The evaluation step tests the model against the test data, and an accuracy score is determined. The script then outputs the accuracy to a file:

%%writefile $source_directory_evaluate/evaluate.py

from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
import pickle
import argparse

# Define arguments
parser = argparse.ArgumentParser(description='Evaluate')
parser.add_argument('--model_dir', type=str, help='Directory of the model')
parser.add_argument('--test_dir', type=str, help='Directory to output the processed test data')
parser.add_argument('--accuracy_dir', type=str, help='Directory to store output raw data')

args = parser.parse_args()

# Get arguments from parser
model_dir = args.model_dir
test_dir = args.test_dir
accuracy_dir = args.accuracy_dir

# load the model and test datasets from their directories
loaded_model = pickle.load(open(model_dir + '/model.pt', 'rb'))
loaded_validx = pickle.load(open(test_dir + '/X_test.sav', 'rb'))
loaded_validy = pickle.load(open(test_dir + '/Y_test.sav', 'rb'))


# Evaluate predictions and output to file
predictions = loaded_model.predict(loaded_validx)
print(accuracy_score(loaded_validy, predictions))
accuracy = accuracy_score(loaded_validy, predictions)

if not os.path.exists(accuracy_dir):
    os.makedirs(accuracy_dir)

with open(accuracy_dir + '/accuracy_file', 'wb') as f:
    pickle.dump(accuracy, f)

Deploy Script

The deploy step takes the newly trained model and deploys it as a web service endpoint:

%%writefile $source_directory_deploy/deploy.py

import os
from sklearn.metrics import accuracy_score

import pickle
from azureml.core.webservice import Webservice
from azureml.core.model import InferenceConfig
from azureml.core.environment import Environment
from azureml.core import Workspace
from azureml.core.model import Model
from azureml.core.run import Run
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.webservice import AciWebservice
from azureml.exceptions import WebserviceException
import argparse

# Create function for registering the model
def register_model(output_dir, model_name, accuracy, test_dir, workspace):
    '''
    Registers a new model
    '''
    model = Model.register(
        model_path = model_dir + '/model.pt',
        model_name = 'iris-classification-pipeline',
        tags = {
            'accuracy': accuracy, 
            'test_data': test_dir
        },
        description='Object recognition classifier',
        workspace=workspace)
    return model

# Define arguments
parser = argparse.ArgumentParser(description='Deploy arg parser')
parser.add_argument('--test_dir', type=str, help='Directory where testing data is stored')
parser.add_argument('--model_dir', type=str, help='File storing the evaluation accuracy')
parser.add_argument('--accuracy_dir', type=str, help='File storing the evaluation accuracy')

args = parser.parse_args()

# Get run context
run = Run.get_context()
workspace = run.experiment.workspace


# Get arguments from parser
test_dir = args.test_dir
accuracy_dir = args.accuracy_dir
model_dir = args.model_dir

if not os.path.exists(model_dir):
    os.makedirs(model_dir)


# Get environment install required packages
env = Environment('iris-env')

# Register environment to re-use later
env.register(workspace = workspace)

# Define model and service names
service_name = 'iris-classification-service'
model_name = 'iris-classification-pipeline'


# Read Accuracy
accuracy = pickle.load(open(accuracy_dir + '/accuracy_file', 'rb'))

# Set up Environment
myenv = Environment.get(workspace=workspace, name="iris-env", version="1")
cd = CondaDependencies.create(pip_packages=['azureml-dataprep[pandas,fuse]>=1.1.14', 'azureml-defaults==1.38.0', 'Jinja2<3.1'], conda_packages = ['scikit-learn==0.24.2'])
myenv.python.conda_dependencies = cd

# Register model if accuracy is higher or if test dataset has changed
new_model = False
try:
    model = Model(workspace, model_name)
    prev_accuracy = model.tags['accuracy']
    prev_test_dir = model.tags['test_data']
    if prev_test_dir != test_dir or prev_accuracy >= accuracy:
        model = register_model(model_dir, model_name, accuracy, test_dir, workspace)
        new_model = True
except WebserviceException:
    print('Model does not exist yet')
    model = register_model(model_dir, model_name, accuracy, test_dir, workspace)
    new_model = True

# Deploy new webservice if new model was registered
if new_model:
    # Create inference config
    inference_config = InferenceConfig(entry_script="score.py", environment=myenv)

    # Deploy model
    aci_config = AciWebservice.deploy_configuration(
        cpu_cores = 2, 
        memory_gb = 4, 
        tags = {'model': 'iris', 'method': 'sklearn'}, 
        description='Iris classifier')

    try:
        service = Webservice(workspace, name=service_name)
        if service:
            service.delete()
    except WebserviceException as e:
        print()

    service = Model.deploy(workspace, service_name, [model], inference_config, aci_config)
    service.wait_for_deployment(True)
else:
    service = Webservice(workspace, name=service_name)

# Output scoring url to file
print(service.scoring_uri)
with open(model_dir + '/scoring_uri.txt', 'w+') as f:
    f.write(service.scoring_uri)

Score Script

The scoring script runs when deploying the web service. The init method contains the logic for retrieving the registered model. The run method contains logic that gets invoked when calling the web service. The example takes the model and performs a prediction against the data that gets sent to the web service endpoint:

%%writefile $source_directory_deploy/score.py
import json
import numpy as np
import os
import pickle
from sklearn.svm import SVC


def init():
    global model
    # Get registered model
    model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pt')
    model = pickle.load(open(model_path, 'rb'))

def run(raw_data):
    data = np.array(json.loads(raw_data)['data'])
    # make prediction
    prediction = model.predict([data])
    # Output prediction
    return prediction.tolist()

Passing Data Between Pipeline Steps

A pipeline can take input and output data. This data can already exist from a dataset or be output data from a previous pipeline step called a PipelineData object.

The first step in the pipeline will be the ingestion step, which downloads the Iris CSV dataset and stores it in a directory on the default data store. The Iris CSV directory location needs to be passed on to the preprocessing step so it can perform its tasks.

The default datastore also needs to be referenced in the PipelineData object using a data reference. This reference is a pointer to the datastore path and is used during a run.

Create a PipelineData object for the Iris data directory:

from azureml.pipeline.core import PipelineData
from azureml.data.data_reference import DataReference

# Get datastore reference
datastore_reference = DataReference(datastore, mode='mount')

# Create Pipeline Data
iris_data_dir = PipelineData(
    name='iris_data_dir', 
    pipeline_output_name='iris_data_dir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

print('The iris data PipelineObject has been created.')

Each pipeline step will need to pass data between them. Define the additional PipelineData objects for the rest of the pipeline workflow:

# Create Pipeline Data for remaining steps
train_dir = PipelineData(
    name='train_dir', 
    pipeline_output_name='train_dir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

output_dir = PipelineData(
    name='output_dir', 
    pipeline_output_name='outputdir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

accuracy_dir = PipelineData(
    name='accuracy_dir', 
    pipeline_output_name='accuracydir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

model_dir = PipelineData(
    name='model_dir', 
    pipeline_output_name='modeldir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

test_dir = PipelineData(
    name='test_dir', 
    pipeline_output_name='test_dir',
    datastore=datastore_reference.datastore,
    output_mode='mount',
    is_directory=True)

print('The remaining PipelineObjects have been created.')

Set up RunConfiguration

The RunConfiguration object contains the information for submitting a training run in the experiment. For this run, the Conda dependencies require the Scikit-Learn package. This ML package will then be accessible during the experiment run:

from azureml.core.runconfig import RunConfiguration, DockerConfiguration
from azureml.core.environment import CondaDependencies


# Configure the conda dependancies for the Run
conda_dep = CondaDependencies()
conda_dep.add_conda_package("scikit-learn==0.24.2")
conda_dep.add_conda_package("pandas==0.25.3")
docker_configuration = DockerConfiguration(use_docker=False)
run_config = RunConfiguration(conda_dependencies=conda_dep)
run_config.docker = docker_configuration

print('Run configuration has been created.')

Defining the Pipeline Steps

After the PipelineDataObjects have been defined, the pipeline steps can be created. There are many built-in pipeline steps available in the Azure ML SDK. For a list of more steps, check out the pipeline step documentation. For now, the PythonScriptStep is used to execute our Python scripts.

The PythonScriptStep consists of the name of the script to run and any arguments to pass through. The source directory is also defined in the PythonScriptStep. This source directory is the local directory created earlier in the lab and is where the ingest.py file is located.

A step in the pipeline can take input data and create output data. In this case, the ingestion step is taking input from the default datastore and creating output for the Iris data directory. Then the iris data directory is passed into the preprocessing step as an input. This linking of inputs and outputs creates an implicit dependency and automatically tells Azure ML which order to run the steps. You could use the run_after construct to declare the order of the steps, but since there is already a data dependency between the steps, this is not necessary:

import os
from azureml.pipeline.steps import PythonScriptStep

# The URL for the Iris data that will be ingested in the first step of the pipeline
url = "https://gist.githubusercontent.com/cristofima/b4deb0c8435d919d769f0a9a57f740a0/raw/37b1fa4c5ffb2dd4b23d9f958f35de96fc57e71c/iris.csv"
        

# Pipeline Steps
ingestion_step = PythonScriptStep(
    script_name='ingest.py',
    arguments=['--iris_data_dir', iris_data_dir, '--urls', url],
    inputs=[datastore_reference],
    outputs=[iris_data_dir],
    compute_target=cpu_cluster,
    source_directory=source_directory_ingest,
    runconfig=run_config,
    allow_reuse=True
)


preprocess_step = PythonScriptStep(
    script_name='preprocess.py',
    arguments=['--iris_data_dir', iris_data_dir, '--train_dir', train_dir,'--test_dir', test_dir],
    inputs=[iris_data_dir],
    outputs=[train_dir, test_dir],
    compute_target=cpu_cluster,
    source_directory=source_directory_preprocess,
    runconfig=run_config,
    allow_reuse=True
)

print('The ingestion and preprocess pipelines have been created.')

Add another step for the training step of the pipeline:

# Create training pipeline step

train_step = PythonScriptStep(
    script_name='train.py',
    arguments=['--train_dir', train_dir, '--output_dir', model_dir],
    inputs=[train_dir],
    outputs=[model_dir],
    compute_target=cpu_cluster,
    source_directory=source_directory_train,
    runconfig=run_config,
    allow_reuse=False
)
print('The training pipeline has been created.')

Create the remaining pipeline steps for evaluating and deploying the model:

# Create the evaluate and deploy pipeline steps

evaluate_step = PythonScriptStep(
    script_name='evaluate.py',
    arguments=['--model_dir', model_dir,'--test_dir', test_dir, '--accuracy_dir',  accuracy_dir],
    inputs=[test_dir,model_dir],
    outputs=[accuracy_dir],
    compute_target=cpu_cluster,
    source_directory=source_directory_evaluate,
    runconfig=run_config,
    allow_reuse=True
)

deploy_step = PythonScriptStep(
    script_name='deploy.py',
    arguments=['--model_dir', model_dir, '--accuracy_dir',  accuracy_dir,'--test_dir', test_dir],
    inputs=[test_dir,accuracy_dir,model_dir],
    outputs=[output_dir],
    compute_target=cpu_cluster,
    source_directory=source_directory_deploy,
    runconfig=run_config,
    allow_reuse=True
)

print('The evaluate and deploy pipelines have been created.')

Run Pipeline

Submit the pipeline to initiate the run; this may take up to 30 minutes for the pipeline to complete:

from azureml.pipeline.core import Pipeline
from azureml.core import Experiment


# Submit the pipeline
print('Submitting pipeline ...')
pipeline = Pipeline(workspace=ws, steps=[ingestion_step, preprocess_step, train_step, evaluate_step, deploy_step])
pipeline_run = Experiment(ws, 'iris_pipeline').submit(pipeline)

To view the progress of the pipeline, either click the link to the Azure Machine Learning Portal generated by the code cell above or run the azureml widget to view the pipeline's status run through the Jupyter Notebook:

# Show run details
from azureml.widgets import RunDetails
r = RunDetails(pipeline_run)
r.get_widget_data()
r.show()

Test the Endpoint

Once the web service is deployed, it can be tested by sending a series of sepal and petal measurements to the URI. The web service will take the data, run a model prediction against it, and return the classification prediction:

import urllib.request, urllib.error # urllib.request and urllib.error for Python 3.X
import json
from azureml.core.webservice import Webservice


# Iris petal and sepal measurements
rawdata = {"data": [
                6.7, 
                3.0, 
                5.2, 
                2.3
            ]
}

# Get the URL of the web service
service = Webservice(workspace=ws, name='iris-classification-service')
url = service.scoring_uri

# Send data to web service
body = str.encode(json.dumps(rawdata))

headers = {'Content-Type':'application/json', 'Authorization':('Bearer ')}
req = urllib.request.Request(url, body, headers)

try:
    response = urllib.request.urlopen(req)
    result = response.read()
    print(result)

except urllib.error.HTTPError as error: 
    print("The request failed with status code: " + str(error.code))
    print(error.info())

You can find here the full source code for this Notebook.

Thanks for reading

Thank you very much for reading; I hope you found this article interesting and may be useful in the future. If you have any questions or ideas that you need to discuss, it will be a pleasure to be able to collaborate and exchange knowledge together.


Similar Articles