# How to operationalize a training pipeline with batch endpoints

This example deploys a training pipeline that takes input training data (labeled) and produces a predictive model, along with the evaluation results and the transformations applied during preprocessing. The pipeline will use tabular data from the [UCI Heart Disease Data Set](https://archive.ics.uci.edu/ml/datasets/Heart+Disease) to train an XGBoost model. We use a data preprocessing component to preprocess the data before it is sent to the training component to fit and evaluate the model.

## 1. Connect to Azure Machine Learning Workspace

The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.

### 1.1. Import the required libraries

In [None]:
from azure.ai.ml import MLClient, Input
from azure.ai.ml import load_component
from azure.ai.ml.entities import (
    Data,
    BatchEndpoint,
    PipelineComponentBatchDeployment,
    AmlCompute,
    Environment,
)
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.core.exceptions import ResourceExistsError
from azure.identity import DefaultAzureCredential

### 1.2. Configure workspace details and get a handle to the workspace

To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the `MLClient` from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace. We use the default [default azure authentication](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for this tutorial. Check the [configuration notebook](../../jobs/configuration.ipynb) for more details on how to configure credentials and connect to a workspace.

In [None]:
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

In [None]:
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

If you are working in a Azure Machine Learning compute, you can simply:

In [None]:
ml_client = MLClient.from_config(DefaultAzureCredential())

## 2. Create the pipeline component

In this section, we'll create all the assets required for our training pipeline. We'll begin by creating an environment that includes necessary libraries to train the model. We'll then create a compute cluster on which the batch deployment will run, and finally, we'll register the input data as a data asset.

### 2.1 Create the environment

The components in this example will use an environment with the `XGBoost` and `scikit-learn` libraries. The `environment/conda.yml` file contains the environment's configuration:

In [None]:
environment = Environment(
    name="xgboost-sklearn-py38",
    description="An environment for models built with XGBoost and Scikit-learn.",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    conda_file="environment/conda.yml",
)

In [None]:
try:
    ml_client.environments.create_or_update(environment)
except ResourceExistsError:
    pass

### 2.2 Create a compute cluster

In [None]:
compute_name = "batch-cluster"
if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    compute_cluster = AmlCompute(
        name=compute_name,
        description="Batch endpoints compute cluster",
        min_instances=0,
        max_instances=5,
    )
    ml_client.begin_create_or_update(compute_cluster).result()

### 2.3 Register the training data as a data asset

In [None]:
data_path = "data/train"
dataset_name = "heart-dataset-train"

heart_dataset_train = Data(
    path=data_path,
    type=AssetTypes.URI_FOLDER,
    description="A training dataset for heart classification",
    name=dataset_name,
)

Create the data asset

In [None]:
ml_client.data.create_or_update(heart_dataset_train)

Let's get a reference to the new data asset:

In [None]:
heart_dataset_train = ml_client.data.get(name=dataset_name, label="latest")

### 2.4 Create the pipeline

The pipeline we want to operationalize has takes 1 input, the training data, and produces 3 outputs, the trained model, the evaluation results, and the data transformations applied as preprocessing. It is composed of 2 components:

In [None]:
prepare_data = load_component(source="components/prepare/prepare.yml")
train_xgb = load_component(source="components/train_xgb/train_xgb.yml")

Construct the pipeline:

In [None]:
@pipeline()
def uci_heart_classifier_trainer(input_data: Input(type=AssetTypes.URI_FOLDER)):
    prepared_data = prepare_data(data=input_data)
    trained_model = train_xgb(
        data=prepared_data.outputs.prepared_data,
        target_column="target",
        register_best_model=False,
        eval_size=0.3,
    )

    return {
        "model": trained_model.outputs.model,
        "evaluation_results": trained_model.outputs.evaluation_results,
        "transformations_output": prepared_data.outputs.transformations_output,
    }

> In the pipeline, the `transformations` input is missing; therefore, the script will learn the parameters from the input data.

### 2.5 Test the pipeline

Let's test the pipeline with some sample data. To do that, we'll create a job using the pipeline and the `batch-cluster` compute cluster created previously.

In [None]:
pipeline_job = uci_heart_classifier_trainer(
    Input(type="uri_folder", path=heart_dataset_train.id)
)

Now, we'll configure some run settings to run the test:

In [None]:
pipeline_job.settings.default_datastore = "workspaceblobstore"
pipeline_job.settings.default_compute = "batch-cluster"

Create the test job:

In [None]:
pipeline_job_run = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="uci-heart-train-pipeline"
)
pipeline_job_run

## 3 Create Batch Endpoint

Batch endpoints are endpoints that are used batch inferencing on large volumes of data over a period of time. Batch endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.

### 3.1 Configure the endpoint

First, let's create the endpoint that is going to host the batch deployments. To ensure that our endpoint name is unique, let's create a random suffix to append to it. 

> In general, you won't need to use this technique but you will use more meaningful names. Please skip the following cell if your case:

In [None]:
endpoint_name = "uci-classifier-train"

In [None]:
import random
import string

# Creating a unique endpoint name by including a random suffix
allowed_chars = string.ascii_lowercase + string.digits
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))
endpoint_name = f"{endpoint_name}-{endpoint_suffix}"

print(f"Endpoint name: {endpoint_name}")

Let's configure the endpoint:

In [None]:
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="An endpoint to perform training of the Heart Disease Data Set prediction task",
)

### 3.2 Create the endpoint
Using the `MLClient` created earlier, we will now create the Endpoint in the workspace. This command will start the endpoint creation and return a confirmation response while the endpoint creation continues.

In [None]:
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

You can see the endpoint as follows:

In [None]:
endpoint = ml_client.batch_endpoints.get(name=endpoint_name)
print(endpoint)

## 4. Deploy the pipeline component

To deploy the pipeline component, we have to create a batch deployment. A deployment is a set of resources required for hosting the asset that does the actual work.

### 4.1 Creating the component

Our pipeline is defined in a function. We are going to create a component out of it. Pipeline components are reusable compute graphs that can be included in batch deployments or used to compose more complex pipelines.

In [None]:
pipeline_component = ml_client.components.create_or_update(
    uci_heart_classifier_trainer().component
)

### 4.2 Configuring the deployment

In [None]:
deployment = PipelineComponentBatchDeployment(
    name="uci-classifier-train-xgb",
    description="A sample deployment that trains an XGBoost model for the UCI dataset.",
    endpoint_name=endpoint.name,
    component=pipeline_component,
    settings={"continue_on_step_failure": False, "default_compute": compute_name},
)

### 4.3 Create the deployment
Using the `MLClient` created earlier, we will now create the deployment in the workspace. This command will start the deployment creation and return a confirmation response while the deployment creation continues.

In [None]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

Once created, let's configure this new deployment as the default one:

In [None]:
endpoint = ml_client.batch_endpoints.get(endpoint_name)
endpoint.defaults.deployment_name = deployment.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

We can see the endpoint URL as follows:

In [None]:
print(f"The default deployment is {endpoint.defaults.deployment_name}")

### 4.6 Testing the deployment

Once the deployment is created, it is ready to recieve jobs.

#### 4.6.1 Run a batch job 

Define the input data asset:

In [None]:
input_data = Input(type=AssetTypes.URI_FOLDER, path=heart_dataset_train.id)

You can invoke the default deployment as follows:

In [None]:
job = ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint.name, inputs={"input_data": input_data}
)

#### 4.6.2 Get the details of the invoked job

Let us get details and logs of the invoked job:

In [None]:
ml_client.jobs.get(job.name)

We can wait for the job to finish using the following code:

In [None]:
ml_client.jobs.stream(name=job.name)

#### 4.6.3 Access job outputs

Once the job is completed, we can access some of its outputs. This pipeline produces the following outputs for its components:
- `preprocess job`: output is `transformations_output`
- `train job`: outputs are `model` and `evaluation_results`

We can download the outputs as follows:

In [None]:
ml_client.jobs.download(
    name=job.name, download_path=".", output_name="transformations_output"
)
ml_client.jobs.download(name=job.name, download_path=".", output_name="model")
ml_client.jobs.download(
    name=job.name, download_path=".", output_name="evaluation_results"
)

In pipelines, each step is executed as child job. You can use `parent_job_name` to find all the child jobs from a given job:

In [None]:
pipeline_job_steps = {
    step.properties["azureml.moduleName"]: step
    for step in ml_client.jobs.list(parent_job_name=job.name)
}

This dictonary contains the module name as key, and the job as values. This makes easier to work with them:

In [None]:
preprocessing_job = pipeline_job_steps["uci_heart_prepare"]
train_job = pipeline_job_steps["uci_heart_train"]

Confirm the jobs' statuses using the following:

In [None]:
print(f"Preprocessing job: {preprocessing_job.status}")
print(f"Training job: {train_job.status}")

You can also access the outputs of each of those intermediate steps as we did for the pipeline job.

## 5. Create a new deployment in the endpoint

Endpoints can host multiple deployments at once, while keeping only one deployment as the default. Therefore, you can iterate over your different models, deploy the different models to your endpoint and test them, and finally, switch the default deployment to the model deployment that works best for you.

Let's change the way preprocessing is done in the pipeline to see if we get a model that performs better.

### 5.1 Change the pipeline configuration

In [None]:
@pipeline()
def uci_heart_classifier_onehot(input_data: Input(type=AssetTypes.URI_FOLDER)):
    prepared_data = prepare_data(data=input_data, categorical_encoding="onehot")
    trained_model = train_xgb(
        data=prepared_data.outputs.prepared_data,
        target_column="target",
        register_best_model=False,
        eval_size=0.3,
    )

    return {
        "model": trained_model.outputs.model,
        "evaluation_results": trained_model.outputs.evaluation_results,
        "transformations_output": prepared_data.outputs.transformations_output,
    }

Build the pipeline:

In [None]:
pipeline_component = uci_heart_classifier_onehot._pipeline_builder.build()

### 5.2 Configure a new deployment

Now we can define the deployment:

In [None]:
deployment_onehot = PipelineComponentBatchDeployment(
    name="uci-classifier-train-onehot",
    description="A sample deployment that trains an XGBoost model for the UCI dataset with one hot encoding of categorical variables.",
    endpoint_name=endpoint.name,
    component=pipeline_component,
    settings={"continue_on_step_failure": False, "default_compute": compute_name},
)

### 5.3 Create the deployment

In [None]:
ml_client.batch_deployments.begin_create_or_update(deployment_onehot).result()

### 5.4 Test a non-default deployment

In [None]:
job = ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint.name,
    deployment_name=deployment_onehot.name,
    inputs={"input_data": input_data},
)

### 5.5 Monitor the job

In [None]:
ml_client.jobs.get(name=job.name)

In [None]:
ml_client.jobs.stream(name=job.name)

### 5.6 Configure the new deployment as the default one

In [None]:
endpoint = ml_client.batch_endpoints.get(endpoint.name)
endpoint.defaults.deployment_name = deployment_onehot.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

### 5.7 Delete the old deployment

In [None]:
print(f"The old deployment name is: {deployment.name}")

In [None]:
ml_client.batch_deployments.begin_delete(
    name=deployment.name, endpoint_name=endpoint.name
).result()

## 6. Clean up resources

Clean-up the resources created. 

In [None]:
ml_client.batch_endpoints.begin_delete(endpoint_name).result()