# Using batch deployments for NLP processing

Batch Endpoints can be used for processing tabular data, but also any other file type like text. Those deployments are supported in both MLflow and custom models. In this tutorial we will learn how to deploy a model that can perform text summarization of long sequences of text using a model from HuggingFace.

This notebook requires:

- `azure-ai-ml`
- `mlflow`
- `azureml-mlflow`
- `numpy`
- `pandas`
- `huggingface`
- `Torch`

## 1. Connect to Azure Machine Learning Workspace

The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.

### 1.1. Import the required libraries

In [None]:
from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    BatchEndpoint,
    ModelBatchDeployment,
    ModelBatchDeploymentSettings,
    Model,
    AmlCompute,
    Data,
    BatchRetrySettings,
    CodeConfiguration,
    Environment,
)
from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction
from azure.identity import DefaultAzureCredential

### 1.2. Configure workspace details and get a handle to the workspace

To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the `MLClient` from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace. We use the default [default azure authentication](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for this tutorial. Check the [configuration notebook](../../jobs/configuration.ipynb) for more details on how to configure credentials and connect to a workspace.

In [None]:
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

In [None]:
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

If you are working in a Azure Machine Learning compute, you can simply:

In [None]:
ml_client = MLClient.from_config(DefaultAzureCredential())

## 2. Registering the model

### 2.1 About the model

The model we are going to work with was built using the popular library transformers from HuggingFace along with a pre-trained model from Facebook with the BART architecture. It was introduced in the paper BART: Denoising Sequence-to-Sequence Pre-training for Natural Language Generation. This model has the following constrains that are important to keep in mind for deployment:

- It can work with sequences up to 1024 tokens.
- It is trained for summarization of text in English.
- We are going to use TensorFlow as a backend.

In [None]:
from transformers import pipeline

summarizer = pipeline("summarization", model="facebook/bart-large-cnn", framework="pt")

Let's save this model locally:

In [None]:
model_local_path = "model"
summarizer.save_pretrained(model_local_path)

### 2.2 Registering the model in the workspace

We need to register the model in order to use it with Azure Machine Learning:

In [None]:
model_name = "bart-text-summarization"

In [None]:
if not any(filter(lambda m: m.name == model_name, ml_client.models.list())):
    print(f"Model {model_name} is not registered. Creating...")
    model = ml_client.models.create_or_update(
        Model(name=model_name, path=model_local_path, type=AssetTypes.CUSTOM_MODEL)
    )

Let's get a reference to the model:

In [None]:
model = ml_client.models.get(name=model_name, label="latest")

## 3 Create Batch Endpoint

Batch endpoints are endpoints that are used batch inferencing on large volumes of data over a period of time. Batch endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.

To create an online endpoint we will use `BatchEndpoint`. This class allows user to configure the following key aspects:
- `name` - Name of the endpoint. Needs to be unique at the Azure region level
- `auth_mode` - The authentication method for the endpoint. Currently only Azure Active Directory (Azure AD) token-based (`aad_token`) authentication is supported. 
- `description`- Description of the endpoint.

### 3.1 Configure the endpoint

First, let's create the endpoint that is going to host the batch deployments. Remember that each endpoint can host multiple deployments at any time, however, only one of them is the default one:

In [None]:
import random
import string

# Creating a unique endpoint name by including a random suffix
allowed_chars = string.ascii_lowercase + string.digits
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))
endpoint_name = "text-summarization-" + endpoint_suffix

Let's configure the endpoint:

In [None]:
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="An batch service to perform text sumarization of content in CSV files",
)

### 3.2 Create the endpoint
Using the `MLClient` created earlier, we will now create the Endpoint in the workspace. This command will start the endpoint creation and return a confirmation response while the endpoint creation continues.

In [None]:
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

## 4. Create a batch deployment

A deployment is a set of resources required for hosting the model that does the actual inferencing. We will create a deployment for our endpoint using the `BatchDeployment` class.

### 4.1 Creating an scoring script to work with the model

In [None]:
%%writefile code/batch_driver.py

import os
import time
import torch
import subprocess
import mlflow
from pprint import pprint
from transformers import AutoTokenizer, BartForConditionalGeneration
from optimum.bettertransformer import BetterTransformer
from datasets import load_dataset

def init():
    global model
    global tokenizer
    global device

    cuda_available = torch.cuda.is_available()
    device = "cuda" if cuda_available else "cpu"

    if cuda_available:
        print(f"[INFO] CUDA version: {torch.version.cuda}")
        print(f"[INFO] ID of current CUDA device: {torch.cuda.current_device()}")
        print("[INFO] nvidia-smi output:")
        pprint(subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE).stdout.decode('utf-8'))
    else:
        print("[WARN] CUDA acceleration is not available. This model takes hours to run on medium size data.")

    # AZUREML_MODEL_DIR is an environment variable created during deployment
    model_path = os.path.join(os.environ["AZUREML_MODEL_DIR"], "model")

    # load the tokenizer
    tokenizer = AutoTokenizer.from_pretrained(
        model_path, truncation=True, max_length=1024
    )
    
    # Load the model
    try:
        model = BartForConditionalGeneration.from_pretrained(model_path, device_map="auto")
    except Exception as e: 
        print(f"[ERROR] Error happened when loading the model on GPU or the default device. Error: {e}")
        print("[INFO] Trying on CPU.")
        model = BartForConditionalGeneration.from_pretrained(model_path)
        device = "cpu"

    # Optimize the model
    if device != "cpu":
        try:        
            model = BetterTransformer.transform(model, keep_original_model=False)
            print("[INFO] BetterTransformer loaded.")
        except Exception as e: 
            print(f"[ERROR] Error when converting to BetterTransformer. An unoptimized version of the model will be used.\n\t> {e}")

    mlflow.log_param("device", device)
    mlflow.log_param("model", type(model).__name__)

def run(mini_batch):
    resultList = []

    print(f"[INFO] Reading new mini-batch of {len(mini_batch)} file(s).")
    ds = load_dataset("csv", data_files={"score": mini_batch})

    start_time = time.perf_counter()
    for idx, text in enumerate(ds["score"]["text"]):
        # perform inference
        inputs = tokenizer.batch_encode_plus(
            [text], truncation=True, padding=True, max_length=1024, return_tensors="pt"
        )
        input_ids = inputs["input_ids"].to(device)
        summary_ids = model.generate(
            input_ids, max_length=130, min_length=30, do_sample=False
        )
        summaries = tokenizer.batch_decode(
            summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
        )

        # Get results:
        resultList.append(summaries[0])
        rps = idx / (time.perf_counter() - start_time + 00000.1)
        print("Rows per second:", rps)
    
    mlflow.log_metric("rows_per_second", rps)
    return resultList

> tokenizer is configured to truncate the lenght of the text as the model has a limit of 1024 tokens.

### 4.2 Creating the compute

Batch deployments can run on any Azure ML compute that already exists in the workspace. That means that multiple batch deployments can share the same compute infrastructure. In this example, we are going to work on an AzureML compute cluster called `gpu-cluster` with GPU hardware acceleration. Let's verify the compute exists on the workspace or create it otherwise.

In [None]:
compute_name = "gpu-cluster"
if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    compute_cluster = AmlCompute(
        name=compute_name,
        description="GPU cluster compute",
        size="Standard_NV6",
        min_instances=0,
        max_instances=2,
    )
    ml_client.compute.begin_create_or_update(compute_cluster).result()

### 4.3 Creating the environment

Let's create the environment. In our case, our model runs on `TensorFlow`. Azure Machine Learning already has an environment with the required software installed, so we can reutilize this environment.

In [None]:
environment = Environment(
    conda_file="environment/torch200-conda.yaml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.8-cudnn8-ubuntu22.04:latest",
)

 ### 4.4 Configuring the deployment
 
 We will create a deployment for our endpoint using the `BatchDeployment` class. This class allows user to configure the following key aspects.
- `name` - Name of the deployment.
- `endpoint_name` - Name of the endpoint to create the deployment under.
- `model` - The model to use for the deployment. This value can be either a reference to an existing versioned model in the workspace or an inline model specification.
- `environment` - The environment to use for the deployment. This value can be either a reference to an existing versioned environment in the workspace or an inline environment specification.
- `code_path`- Path to the source code directory for scoring the model
- `scoring_script` - Relative path to the scoring file in the source code directory
- `compute` - Name of the compute target to execute the batch scoring jobs on
- `instance_count`- The number of nodes to use for each batch scoring job.		1
- `max_concurrency_per_instance`- The maximum number of parallel scoring_script runs per instance.
- `mini_batch_size`	- The number of files the code_configuration.scoring_script can process in one `run()` call.
- `retry_settings`- Retry settings for scoring each mini batch.		
   - `max_retries`- The maximum number of retries for a failed or timed-out mini batch (default is 3)
   - `timeout`- The timeout in seconds for scoring a mini batch (default is 30)
- `output_action`- Indicates how the output should be organized in the output file. Allowed values are `append_row` or `summary_only`. Default is `append_row`
- `output_file_name`- Name of the batch scoring output file. Default is `predictions.csv`
- `environment_variables`- Dictionary of environment variable name-value pairs to set for each batch scoring job.
- `logging_level`- The log verbosity level.	Allowed values are `warning`, `info`, `debug`. Default is `info`.

In [None]:
deployment = ModelBatchDeployment(
    name="text-summarization-hfbart",
    description="A text summarization deployment implemented with HuggingFace and BART architecture",
    endpoint_name=endpoint.name,
    model=model,
    environment=environment,
    code_configuration=CodeConfiguration(
        code="code",
        scoring_script="batch_driver.py",
    ),
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        instance_count=2,
        max_concurrency_per_instance=1,
        mini_batch_size=1,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name="predictions.csv",
        retry_settings=BatchRetrySettings(max_retries=3, timeout=3000),
        logging_level="info",
    ),
)

### 4.5 Create the deployment
Using the `MLClient` created earlier, we will now create the deployment in the workspace. This command will start the deployment creation and return a confirmation response while the deployment creation continues.

In [None]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

Let's update the default deployment name in the endpoint:

In [None]:
endpoint = ml_client.batch_endpoints.get(endpoint_name)
endpoint.defaults.deployment_name = deployment.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

We can see the endpoint URL as follows:

In [None]:
print(f"The default deployment is {endpoint.defaults.deployment_name}")

### 4.6 Testing the deployment

Once the deployment is created, it is ready to recieve jobs.

#### 4.6.1 Creating a data asset

Let's first register a data asset so we can run the job against it. This data asset is a folder containing a sample of BillSum, the first dataset for summarization of US Congressional and California state bills. The BillSum dataset consists of three parts: US training bills, US test bills and California test bills. The US bills were collected from the Govinfo service provided by the United States Government Publishing Office (GPO).



In [None]:
data_path = "data"
dataset_name = "billsummary-small"

billsummary_data = Data(
    path=data_path,
    type=AssetTypes.URI_FOLDER,
    description="A sample of the billsum dataset for text summarization, in CSV file format",
    name=dataset_name,
)

In [None]:
billsummary_data = ml_client.data.create_or_update(billsummary_data)

Let's wait for the data asset:

In [None]:
from time import sleep

print(f"Waiting for data asset {dataset_name}", end="")
while not any(filter(lambda m: m.name == dataset_name, ml_client.data.list())):
    sleep(10)
    print(".", end="")

print(" [DONE]")

Let's get a reference of the new data asset:

In [None]:
billsummary_data = ml_client.data.get(name=dataset_name, label="latest")

#### 4.6.2 Creating an input for the deployment

In [None]:
input = Input(type=AssetTypes.URI_FOLDER, path=billsummary_data.id)

#### 4.6.3 Invoke the deployment

Using the `MLClient` created earlier, we will get a handle to the endpoint. The endpoint can be invoked using the `invoke` command with the following parameters:
- `name` - Name of the endpoint
- `input_path` - Path where input data is present

In [None]:
job = ml_client.batch_endpoints.invoke(endpoint_name=endpoint.name, input=input)

#### 4.6.4 Get the details of the invoked job

Let us get details and logs of the invoked job:

In [None]:
ml_client.jobs.get(job.name)

We can wait for the job to finish using the following code:

In [None]:
ml_client.jobs.stream(job.name)

### 4.7 Exploring the results

#### 4.7.1 Download the results

The deployment creates a child job that executes the scoring. We can get the details of it using the following code:

In [None]:
scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]

In [None]:
print("Job name:", scoring_job.name)
print("Job status:", scoring_job.status)
print(
    "Job duration:",
    scoring_job.creation_context.last_modified_at
    - scoring_job.creation_context.created_at,
)

The outputs generated by the deployment job will be placed in an output named `score`:

In [None]:
ml_client.jobs.download(name=scoring_job.name, download_path=".", output_name="score")