## Text Classification with distributed training (MultiNode) - Emotion Detection using Serverless Compute 

This sample shows how use `text-classification` components from the `azureml` system registry to fine tune a model to detect emotions using emotion dataset. [Serverless compute](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-use-serverless-compute?view=azureml-api-2&tabs=python) is used to fine-tune the model. **This notebook is dedicated to illustrate how to use multinode training with finetune components**. We then deploy the fine tuned model to an online endpoint for real time inference. The model is trained on tiny sample of the dataset with a small number of epochs to illustrate the fine tuning approach.

### Training data
We will use the [emotion](https://huggingface.co/datasets/dair-ai/emotion) dataset.

### Model
This notebook is curated for `Llama` models for text-classification, Llama models are picked from `azureml-meta` registry. In this notebook we finetune with Llama-2-7b model, if you want to finetune with other variants like 13b or 70b, you can use this notebook with probably different SKUs.

### Outline
* Pick a model to fine tune.
* Pick and explore training data.
* Configure the fine tuning job.
* Run the fine tuning job.
* Review training and evaluation metrics. 
* Register the fine tuned model. 
* Deploy the fine tuned model for real time inference.
* Clean up resources. 

### 1. Setup pre-requisites
* Install dependencies
* Connect to AzureML Workspace. Learn more at [set up SDK authentication](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication?tabs=sdk). Replace  `<WORKSPACE_NAME>`, `<RESOURCE_GROUP>` and `<SUBSCRIPTION_ID>` below.
* Connect to `azureml` system registry
* Set an optional experiment name


Install dependencies by running below cell. This is not an optional step if running in a new environment.

In [None]:
%pip install azure-ai-ml
%pip install azure-identity
%pip install datasets==2.9.0
%pip install mlflow
%pip install azureml-mlflow

In [None]:
from azure.ai.ml import MLClient
from azure.identity import (
    DefaultAzureCredential,
    InteractiveBrowserCredential,
    ClientSecretCredential,
)
from azure.ai.ml.entities import AmlCompute
import time

try:
    credential = DefaultAzureCredential()
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    credential = InteractiveBrowserCredential()

try:
    workspace_ml_client = MLClient.from_config(credential=credential)
except:
    workspace_ml_client = MLClient(
        credential,
        subscription_id="<SUBSCRIPTION_ID>",
        resource_group_name="<RESOURCE_GROUP>",
        workspace_name="<WORKSPACE_NAME>",
    )

# the models, fine tuning pipelines and environments are available in the AzureML system registry, "azureml"
registry_ml_client = MLClient(credential, registry_name="azureml")
registry_ml_client_meta = MLClient(credential, registry_name="azureml-meta")

experiment_name = "llama-text-classification-emotion-detection"

# generating a unique timestamp that can be used for names and versions that need to be unique
timestamp = str(int(time.time()))

### 2. Pick a foundation model to fine tune

Models that support `fill-mask` tasks are good candidates to fine tune for `text-classification`. You can browse these models in the Model Catalog in the AzureML Studio, filtering by the `fill-mask` task. In this example, we use the `bert-base-uncased` model. If you have opened this notebook for a different model, replace the model name and version accordingly. 

Note the model id property of the model. This will be passed as input to the fine tuning job. This is also available as the `Asset ID` field in model details page in AzureML Studio Model Catalog. 

In [None]:
model_name = "Llama-2-7b"
foundation_model = registry_ml_client_meta.models.get(model_name, label="latest")
print(
    "\n\nUsing model name: {0}, version: {1}, id: {2} for fine tuning".format(
        foundation_model.name, foundation_model.version, foundation_model.id
    )
)

### 3. Pick the dataset for fine-tuning the model

We use the [emotion](https://huggingface.co/datasets/dair-ai/emotion) dataset. The next few cells show basic data preparation for fine tuning:
* Visualize some data rows
* Replace numerical categories in data with the actual string labels. This mapping is available in the [./emotion-dataset/label.json](./emotion-dataset/label.json). This step is needed if you want string labels such as `anger`, `joy`, etc. returned when scoring the model. If you skip this step, the model will return numerical categories such as 0, 1, 2, etc. and you will have to map them to what the category represents yourself. 
* We want this sample to run quickly, so save smaller `train`, `validation` and `test` files containing 10% of the original. This means the fine tuned model will have lower accuracy, hence it should not be put to real-world use. 

##### Here is an example of how the data should look like

Single text classification requires the training data to include at least 2 fields – one for ‘Sentence1’ and ‘Label’ like in this example. Sentence 2 can be left blank in this case. The below examples are from Emotion dataset. 

| Text (Sentence1) | Label (Label) |
| :- | :- |
| i feel so blessed to be able to share it with you all | joy | 
| i feel intimidated nervous and overwhelmed and i shake like a leaf | fear | 

 

Text pair classification, where you have two sentences to be classified (e.g., sentence entailment) will need the training data to have 3 fields – for ‘Sentence1’, ‘Sentence2’ and ‘Label’ like in this example. The below examples are from Microsoft Research Paraphrase Corpus dataset. 

| Text1 (Sentence 1) | Text2 (Sentence 2) | Label_text (Label) |
| :- | :- | :- |
| Amrozi accused his brother , whom he called " the witness " , of deliberately distorting his evidence . | Referring to him as only " the witness " , Amrozi accused his brother of deliberately distorting his evidence . | equivalent |
| Yucaipa owned Dominick 's before selling the chain to Safeway in 1998 for $ 2.5 billion . | Yucaipa bought Dominick 's in 1995 for \$ 693 million and sold it to Safeway for \$ 1.8 billion in 1998 . | not equivalent |

 

In [None]:
# download the dataset using the helper script. This needs datasets library: https://pypi.org/project/datasets/
import os

exit_status = os.system("python ./download-dataset.py --download_dir emotion-dataset")
if exit_status != 0:
    raise Exception("Error downloading dataset")

In [None]:
# load the ./emotion-dataset/train.jsonl file into a pandas dataframe and show the first 5 rows
import pandas as pd

pd.set_option(
    "display.max_colwidth", 0
)  # set the max column width to 0 to display the full text
df = pd.read_json("./emotion-dataset/train.jsonl", lines=True)
df.head()

In [None]:
# load the id2label json element of the ./emotion-dataset/label.json file into pandas table with keys as 'label' column of int64 type and values as 'label_string' column as string type
import json

with open("./emotion-dataset/label.json") as f:
    id2label = json.load(f)
    id2label = id2label["id2label"]
    label_df = pd.DataFrame.from_dict(
        id2label, orient="index", columns=["label_string"]
    )
    label_df["label"] = label_df.index.astype("int64")
    label_df = label_df[["label", "label_string"]]
label_df.head()

In [None]:
# load test.jsonl, train.jsonl and validation.jsonl form the ./emotion-dataset folder into pandas dataframes
test_df = pd.read_json("./emotion-dataset/test.jsonl", lines=True)
train_df = pd.read_json("./emotion-dataset/train.jsonl", lines=True)
validation_df = pd.read_json("./emotion-dataset/validation.jsonl", lines=True)
# join the train, validation and test dataframes with the id2label dataframe to get the label_string column
train_df = train_df.merge(label_df, on="label", how="left")
validation_df = validation_df.merge(label_df, on="label", how="left")
test_df = test_df.merge(label_df, on="label", how="left")
# show the first 5 rows of the train dataframe
train_df.head()

In [None]:
# save 10% of the rows from the train, validation and test dataframes into files with small_ prefix in the ./emotion-dataset folder
frac = 1
train_df.sample(frac=frac).to_json(
    "./emotion-dataset/small_train.jsonl", orient="records", lines=True
)
validation_df.sample(frac=frac).to_json(
    "./emotion-dataset/small_validation.jsonl", orient="records", lines=True
)
test_df.sample(frac=frac).to_json(
    "./emotion-dataset/small_test.jsonl", orient="records", lines=True
)

### 4. Submit the fine tuning job using the the model and data as inputs
 
Create the job that uses the `text-classification` pipeline component. [Learn more](https://github.com/Azure/azureml-assets/blob/main/training/finetune_acft_hf_nlp/components/pipeline_components/text_classification/README.md) about all the parameters supported for fine tuning.

Define finetune parameters

Finetune parameters can be grouped into 2 categories - training parameters, optimization parameters

Training parameters define the training aspects such as - 
1. the optimizer, scheduler to use
2. the metric to optimize the finetune
3. number of training steps and the batch size
and so on

Optimization parameters help in optimizing the GPU memory and effectively using the compute resources. Below are few of the parameters that belong to this category. _The optimization parameters differs for each model and are packaged with the model to handle these variations._
1. enable the deepspeed, ORT and LoRA
2. enable mixed precision training
2. enable multi-node training 

In [None]:
import ast

# Training parameters
training_parameters = dict(
    num_train_epochs=3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    learning_rate=2e-5,
    metric_for_best_model="f1_macro",
)
print(f"The following training parameters are enabled - {training_parameters}")

# Optimization parameters - As these parameters are packaged with the model itself, lets retrieve those parameters
if "model_specific_defaults" in foundation_model.tags:
    optimization_parameters = ast.literal_eval(
        foundation_model.tags["model_specific_defaults"]
    )  # convert string to python dict
else:
    optimization_parameters = dict(
        apply_lora="true", apply_deepspeed="true", apply_ort="true"
    )
print(f"The following optimizations are enabled - {optimization_parameters}")

**Enable MULTINODE training**

You can set the parameter `num_nodes_finetune` to finetune on required number of nodes.
Default value is 1 which is same as single node training.
If set to a value greater than 1, it will use as many number of nodes as requested

Things to take care for multinode training:
1. parameter `number_of_gpu_to_use_finetuning` should be set to number of gpus to use per node. for eg. if you are using ND40rs_v2 SKU which contains 8 gpu per node and you set num_nodes_finetune=2, you still need to set number_of_gpu_to_use_finetuning=8 and **not** 16.
2. Make sure you have enough quota, and your compute is scalable to required number of gpus. The job will wait to acquire given number of nodes before it starts
3. Set output modes to mount instead of upload. `pytorch_model_folder` output of finetune component needs to be changed to mount mode instead of default upload mode

In [None]:
# PARAMETER for enabling multinode training
# Running distributed training on 2 nodes
num_nodes = 2

In [None]:
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import CommandComponent, PipelineComponent, Job, Component
from azure.ai.ml import PyTorchDistribution, Input
from azure.ai.ml.constants import InputOutputModes

# fetch the pipeline component
pipeline_component_func = registry_ml_client.components.get(
    name="text_classification_pipeline", label="latest"
)


# define the pipeline job
@pipeline()
def create_pipeline():
    text_classification_pipeline = pipeline_component_func(
        # specify the foundation model available in the azureml system registry id identified in step #3
        mlflow_model_path=foundation_model.id,
        # huggingface_id = 'bert-base-uncased', # if you want to use a huggingface model, uncomment this line and comment the above line
        instance_type_finetune="Standard_NC24s_v3",
        instance_type_model_import="Standard_E4s_v3",
        instance_type_preprocess="Standard_E4s_v3",
        instance_type_model_evalutation="Standard_NC24s_v3",
        # map the dataset splits to parameters
        train_file_path=Input(
            type="uri_file", path="./emotion-dataset/small_train.jsonl"
        ),
        validation_file_path=Input(
            type="uri_file", path="./emotion-dataset/small_validation.jsonl"
        ),
        test_file_path=Input(
            type="uri_file", path="./emotion-dataset/small_test.jsonl"
        ),
        evaluation_config=Input(
            type="uri_file", path="./text-classification-config.json"
        ),
        # The following parameters map to the dataset fields
        sentence1_key="text",
        label_key="label_string",
        # Training settings
        number_of_gpu_to_use_finetuning=4,  # set to the number of GPUs available in the compute
        num_nodes_finetune=num_nodes,  # set to number of nodes to be used for distributed training
        **training_parameters,
        **optimization_parameters
    )

    # Set output mode to mount instead of upload, multinode setting does not work with upload mode
    text_classification_pipeline.outputs[
        "pytorch_model_folder"
    ].mode = InputOutputModes.RW_MOUNT
    text_classification_pipeline.outputs[
        "mlflow_model_folder"
    ].mode = InputOutputModes.RW_MOUNT

    return {
        # map the output of the fine tuning job to the output of pipeline job so that we can easily register the fine tuned model
        # registering the model is required to deploy the model to an online or batch endpoint
        "trained_model": text_classification_pipeline.outputs.mlflow_model_folder
    }


pipeline_object = create_pipeline()

# don't use cached results from previous jobs
pipeline_object.settings.force_rerun = True

# set continue on step failure to False
pipeline_object.settings.continue_on_step_failure = False

Submit the job

In [None]:
# submit the pipeline job
pipeline_job = workspace_ml_client.jobs.create_or_update(
    pipeline_object, experiment_name=experiment_name
)
# wait for the pipeline job to complete
workspace_ml_client.jobs.stream(pipeline_job.name)

### 5. Review training and evaluation metrics
Viewing the job in AzureML studio is the best way to analyze logs, metrics and outputs of jobs. You can create custom charts and compare metics across different jobs. See https://learn.microsoft.com/en-us/azure/machine-learning/how-to-log-view-metrics?tabs=interactive#view-jobsruns-information-in-the-studio to learn more. 

However, we may need to access and review metrics programmatically for which we will use MLflow, which is the recommended client for logging and querying metrics.

In [None]:
import mlflow, json

mlflow_tracking_uri = workspace_ml_client.workspaces.get(
    workspace_ml_client.workspace_name
).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# concat 'tags.mlflow.rootRunId=' and pipeline_job.name in single quotes as filter variable
filter = "tags.mlflow.rootRunId='" + pipeline_job.name + "'"
runs = mlflow.search_runs(
    experiment_names=[experiment_name], filter_string=filter, output_format="list"
)
training_run = None
evaluation_run = None
# get the training and evaluation runs.
# using a hacky way till 'Bug 2320997: not able to show eval metrics in FT notebooks - mlflow client now showing display names' is fixed
for run in runs:
    # check if run.data.metrics.epoch exists
    if "epoch" in run.data.metrics:
        training_run = run
    # else, check if run.data.metrics.accuracy exists
    elif "accuracy" in run.data.metrics:
        evaluation_run = run

In [None]:
if training_run:
    print("Training metrics:\n\n")
    print(json.dumps(training_run.data.metrics, indent=2))
else:
    print("No Training job found")

In [None]:
if evaluation_run:
    print("Evaluation metrics:\n\n")
    print(json.dumps(evaluation_run.data.metrics, indent=2))
else:
    print("No Evaluation job found")

### 6. Register the fine tuned model with the workspace

We will register the model from the output of the fine tuning job. This will track lineage between the fine tuned model and the fine tuning job. The fine tuning job, further, tracks lineage to the foundation model, data and training code.

In [None]:
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

# check if the `trained_model` output is available
print("pipeline job outputs: ", workspace_ml_client.jobs.get(pipeline_job.name).outputs)

# fetch the model from pipeline job output - not working, hence fetching from fine tune child job
model_path_from_job = "azureml://jobs/{0}/outputs/{1}".format(
    pipeline_job.name, "trained_model"
)

finetuned_model_name = model_name + "-emotion-detection"
finetuned_model_name = finetuned_model_name.replace("/", "-")
print("path to register model: ", model_path_from_job)
prepare_to_register_model = Model(
    path=model_path_from_job,
    type=AssetTypes.MLFLOW_MODEL,
    name=finetuned_model_name,
    version=timestamp,  # use timestamp as version to avoid version conflict
    description=model_name + " fine tuned model for emotion detection",
)
print("prepare to register model: \n", prepare_to_register_model)
# register the model from pipeline job output
registered_model = workspace_ml_client.models.create_or_update(
    prepare_to_register_model
)
print("registered model: \n", registered_model)

### 7. Deploy the fine tuned model to an online endpoint
Online endpoints give a durable REST API that can be used to integrate with applications that need to use the model.

In [None]:
import time, sys
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    ProbeSettings,
)

# Create online endpoint - endpoint names need to be unique in a region, hence using timestamp to create unique endpoint name

online_endpoint_name = "emotion-" + timestamp
# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="Online endpoint for "
    + registered_model.name
    + ", fine tuned model for emotion detection",
    auth_mode="key",
)
workspace_ml_client.begin_create_or_update(endpoint).wait()

You can find here the list of SKU's supported for deployment - [Managed online endpoints SKU list](https://learn.microsoft.com/en-us/azure/machine-learning/reference-managed-online-endpoints-vm-sku-list)

In [None]:
# create a deployment
demo_deployment = ManagedOnlineDeployment(
    name="demo",
    endpoint_name=online_endpoint_name,
    model=registered_model.id,
    instance_type="Standard_E64s_v3",
    instance_count=1,
    liveness_probe=ProbeSettings(initial_delay=600),
)
workspace_ml_client.online_deployments.begin_create_or_update(demo_deployment).wait()
endpoint.traffic = {"demo": 100}
workspace_ml_client.begin_create_or_update(endpoint).result()

### 8. Test the endpoint with sample data

We will fetch some sample data from the test dataset and submit to online endpoint for inference. We will then show the display the scored labels alongside the ground truth labels

In [None]:
# read ./emotion-dataset/small_test.jsonl into a pandas dataframe
test_df = pd.read_json("./emotion-dataset/small_test.jsonl", lines=True)
# take 5 random samples
test_df = test_df.sample(n=5)
# rebuild index
test_df.reset_index(drop=True, inplace=True)
# rename the label_string column to ground_truth_label
test_df = test_df.rename(columns={"label_string": "ground_truth_label"})
test_df.head(5)

In [None]:
# create a json object with the key as "inputs" and value as a list of values from the text column of the test dataframe
test_df_copy = test_df[["text"]]
test_df_copy = test_df_copy.rename(columns={"text": "input_string"})
test_json = {"input_data": test_df_copy.to_dict("split")}
# save the json object to a file named sample_score.json in the ./emotion-dataset folder
with open("./emotion-dataset/sample_score.json", "w") as f:
    json.dump(test_json, f)

In [None]:
# score the sample_score.json file using the online endpoint with the azureml endpoint invoke method
response = workspace_ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name="demo",
    request_file="./emotion-dataset/sample_score.json",
)
print("raw response: \n", response, "\n")
# convert the response to a pandas dataframe and rename the label column as scored_label
response_df = pd.read_json(response)
response_df = response_df.rename(columns={0: "scored_label"})
response_df.head(5)

In [None]:
# merge the test dataframe and the response dataframe on the index
merged_df = pd.merge(test_df, response_df, left_index=True, right_index=True)
merged_df.head(5)

### 9. Delete the online endpoint
Don't forget to delete the online endpoint, else you will leave the billing meter running for the compute used by the endpoint

In [None]:
workspace_ml_client.online_endpoints.begin_delete(name=online_endpoint_name).wait()