# Binary Text Classification scenario with RAI Dashboard as Deployed Endpoint

The [blbooksgenre dataset](https://huggingface.co/datasets/blbooksgenre) classifies various book titles into fiction and nonfiction titles.

In this notebook we will look into this binary text classification scenario end to end where the huggingface model will be deployed as a blackbox model webservice endpoint in AzureML from MLFlow.

Install datasets to retrieve this dataset from huggingface:

In [None]:
%pip install datasets
%pip install ml-wrappers
%pip install "pandas<2.0.0"

First, we need to specify the version of the RAI components which are available in the workspace. This was specified when the components were uploaded.

In [None]:
version_string = "0.0.20"

We also need to give the name of the compute cluster we want to use in AzureML. Later in this notebook, we will create it if it does not already exist:

In [None]:
compute_name = "cpucluster"

Finally, we need to specify a version for the data and components we will create while running this notebook. This should be unique for the workspace, but the specific value doesn't matter:

In [None]:
rai_example_version_string = "41"

## Accessing the Data

We supply the data as a pair of parquet files and accompanying `MLTable` file. We can download them, preprocess them, and take a brief look:

In [None]:
import os
import datasets
import pandas as pd

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

NUM_TEST_SAMPLES = 100

In [None]:
def load_dataset(split):
    config_kwargs = {"name": "title_genre_classifiction"}
    dataset = datasets.load_dataset(
        "blbooksgenre", split=split, trust_remote_code=True, **config_kwargs
    )
    return pd.DataFrame({"text": dataset["title"], "label": dataset["label"]})


pd_data = load_dataset("train")

_, pd_test_data = train_test_split(pd_data, test_size=0.2, random_state=0)

test_data = pd_test_data[:NUM_TEST_SAMPLES]

Now create the mltable:

In [None]:
pq_filename = "hf_data.parquet"


def create_ml_table_file_contents(pq_filename):
    return (
        "$schema: http://azureml/sdk-2-0/MLTable.json\n"
        "type: mltable\n"
        "paths:\n"
        " - file: ./{0}\n"
        "transformations:\n"
        " - read_parquet\n"
    ).format(pq_filename)


def write_to_parquet(data, path, pq_filename):
    os.makedirs(path, exist_ok=True)
    data.to_parquet(os.path.join(path, pq_filename), index=False)


def create_ml_table_file(path, contents):
    with open(os.path.join(path, "MLTable"), "w") as f:
        f.write(contents)


test_data_path = "test_data"

write_to_parquet(test_data, test_data_path, pq_filename)

mltable_file_contents = create_ml_table_file_contents(pq_filename)
create_ml_table_file(test_data_path, mltable_file_contents)

Load some data for a quick view:

In [None]:
import mltable

tbl = mltable.load(test_data_path)
test_df: pd.DataFrame = tbl.to_pandas_dataframe()

display(test_df)

The label column contains the classes:

In [None]:
target_column_name = "label"

First, we need to upload the datasets to our workspace. We start by creating an `MLClient` for interactions with AzureML:

In [None]:
# Enter details of your AML workspace
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

In [None]:
# Handle to the workspace
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

try:
    credential = DefaultAzureCredential()
    ml_client = MLClient(
        credential=credential,
        subscription_id=subscription_id,
        resource_group_name=resource_group,
        workspace_name=workspace,
    )
except Exception:
    # If in compute instance we can get the config automatically
    from azureml.core import Workspace

    workspace = Workspace.from_config()
    workspace.write_config()
    ml_client = MLClient.from_config(
        credential=DefaultAzureCredential(exclude_shared_token_cache_credential=True),
        logging_enable=True,
    )

print(ml_client)

We can now upload the data to AzureML:

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

input_test_data = "blbooksgenre_Test_MLTable"

try:
    test_data = ml_client.data.get(
        name=input_test_data,
        version=rai_example_version_string,
    )
except Exception:
    test_data = Data(
        path=test_data_path,
        type=AssetTypes.MLTABLE,
        description="RAI blbooksgenre test data",
        name=input_test_data,
        version=rai_example_version_string,
    )
    ml_client.data.create_or_update(test_data)

# Creating the Model

To simplify the model creation process, we're going to use a pipeline.

We create a directory for the training script:

In [None]:
import os

os.makedirs("blbooksgenre_component_src", exist_ok=True)

Next, we write out our script to retrieve the trained model:

In [None]:
%%writefile blbooksgenre_component_src/training_script.py

import argparse
import logging
import json
import os
import time
import numpy as np

import mlflow
import mlflow.pyfunc

import zipfile
from azureml.core import Run

from transformers import AutoModelForSequenceClassification, \
    AutoTokenizer, pipeline

from raiutils.common.retries import retry_function

try:
    from urllib import urlretrieve
except ImportError:
    from urllib.request import urlretrieve


_logger = logging.getLogger(__file__)
logging.basicConfig(level=logging.INFO)


BLBOOKSGENRE_MODEL_NAME = "blbooksgenre_model"
NUM_LABELS = 2


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--model_output_path", type=str, help="Path to write model info JSON"
    )
    parser.add_argument(
        "--model_base_name", type=str, help="Name of the registered model"
    )
    parser.add_argument(
        "--model_name_suffix", type=str, help="Model name suffix"
    )
    parser.add_argument(
        "--device", type=int, help=(
            "Device for CPU/GPU supports. Setting this to -1 will leverage "
            "CPU, >=0 will run the model on the associated CUDA device id.")
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


class FetchModel(object):
    def __init__(self):
        pass

    def fetch(self):
        zipfilename = BLBOOKSGENRE_MODEL_NAME + '.zip'
        url = ('https://publictestdatasets.blob.core.windows.net/models/' +
               BLBOOKSGENRE_MODEL_NAME + '.zip')
        urlretrieve(url, zipfilename)
        with zipfile.ZipFile(zipfilename, 'r') as unzip:
            unzip.extractall(BLBOOKSGENRE_MODEL_NAME)


def retrieve_blbooksgenre_model():
    fetcher = FetchModel()
    action_name = "Model download"
    err_msg = "Failed to download model"
    max_retries = 4
    retry_delay = 60
    retry_function(fetcher.fetch, action_name, err_msg,
                   max_retries=max_retries,
                   retry_delay=retry_delay)
    model = AutoModelForSequenceClassification.from_pretrained(
        BLBOOKSGENRE_MODEL_NAME, num_labels=NUM_LABELS)
    return model


class HuggingfaceWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, pipeline):
        self._pipeline = pipeline

    def predict(self, context, model_input):
        if isinstance(model_input, np.ndarray):
            model_input = model_input.tolist()
        result = self._pipeline(model_input)
        return result


def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    _logger.info("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)

    _logger.info("Getting device")
    device = args.device

    _logger.info("Loading parquet input")

    # load the model and tokenizer
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
    model = retrieve_blbooksgenre_model()

    if device >= 0:
        model = model.cuda()

    # build a pipeline object to do predictions
    _logger.info("Building model")
    # set to false to only return predicted score in webservice
    return_all_scores = False
    pred = pipeline(
        "text-classification",
        model=model,
        tokenizer=tokenizer,
        device=device,
        return_all_scores=return_all_scores
    )

    suffix = args.model_name_suffix
    registered_name = "{0}_{1}".format(args.model_base_name, suffix)
    _logger.info(f"Registering model as {registered_name}")

    # my_mlflow = PyfuncModel(pred)
    my_mlflow = HuggingfaceWrapper(pred)

    # Saving model with mlflow
    _logger.info("Saving with mlflow")
    mlflow.pyfunc.log_model(
        python_model=my_mlflow,
        registered_model_name=registered_name,
        artifact_path=registered_name,
        pip_requirements=['mlflow', 'torch>=2.2.2', 'transformers>=4.17.0,<=4.44.0', 'numpy<2.0.0'])

    _logger.info("Writing JSON")
    dict = {"id": "{0}:1".format(registered_name)}
    output_path = os.path.join(args.model_output_path, "model_info.json")
    with open(output_path, "w") as of:
        json.dump(dict, fp=of)


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Now, we can build this into an AzureML component:

In [None]:
from azure.ai.ml import load_component

yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: rai_training_component
display_name: blbooksgenre training component for RAI example
version: {rai_example_version_string}
type: command
inputs:
  model_base_name:
    type: string
  model_name_suffix:
    type: string
  device: # set to >= 0 to use GPU
    type: integer
    default: 0
outputs:
  model_output_path:
    type: path
code: ./blbooksgenre_component_src/
environment: azureml://registries/azureml/environments/responsibleai-text/versions/13
command: >-
  python training_script.py
  --model_base_name ${{{{inputs.model_base_name}}}}
  --model_name_suffix ${{{{inputs.model_name_suffix}}}}
  --device ${{{{inputs.device}}}}
  --model_output_path ${{{{outputs.model_output_path}}}}
"""

yaml_filename = "blbooksgenreTextTrainingComp.yaml"

with open(yaml_filename, "w") as f:
    f.write(yaml_contents)

train_component_definition = load_component(source=yaml_filename)

ml_client.components.create_or_update(train_component_definition)

We need a compute target on which to run our jobs. The following checks whether the compute specified above is present; if not, then the compute target is created.

In [None]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="STANDARD_DS3_V2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600,
    )
    ml_client.compute.begin_create_or_update(my_compute)
    print("Initiated compute creation")

## Running a training pipeline

Now that we have our training component, we can run it. We begin by generating a unique name for the mode;

In [None]:
import random
import string

# Creating a unique model and endpoint name by including a random suffix
allowed_chars = string.ascii_lowercase + string.digits
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))

In [None]:
import time

model_base_name = "blbooksgenre_model"
model_name_suffix = endpoint_suffix
device = -1

Next, we define our training pipeline. This has two components. The first is the training component which we defined above. The second is a component to register the model in AzureML:

In [None]:
from azure.ai.ml import dsl, Input

train_model_component = ml_client.components.get(
    name="rai_training_component", version=rai_example_version_string
)


@dsl.pipeline(
    compute=compute_name,
    description="Register Model for RAI blbooksgenre example",
    experiment_name=f"RAI_blbooksgenre_Example_Model_Training_{model_name_suffix}",
)
def my_training_pipeline(model_base_name, model_name_suffix, device):
    trained_model = train_component_definition(
        model_base_name=model_base_name,
        model_name_suffix=model_name_suffix,
        device=device,
    )
    trained_model.set_limits(timeout=3600)

    return {}


model_registration_pipeline_job = my_training_pipeline(
    model_base_name, model_name_suffix, device
)

With the training pipeline defined, we can submit it for execution in AzureML. We define a helper function to wait for the job to complete:

In [None]:
from azure.ai.ml.entities import PipelineJob


def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in [
        "Completed",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))
    print("pipeline job outputs: ", created_job.outputs)
    print(created_job.outputs)
    # get job details
    print(created_job)
    # stream the job logs
    ml_client.jobs.stream(name=created_job.name)
    assert created_job.status == "Completed"
    return created_job


# This is the actual submission
training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

Now we can define the registered model id:

In [None]:
model_name = f"{model_base_name}_{model_name_suffix}"
model_version = 1

### Create Scoring Endpoint 

In [None]:
endpoint_name = "blbooksgenre-classifier-" + endpoint_suffix

print(f"Endpoint name: {endpoint_name}")

In [None]:
import os
import mlflow

mlflow_tracking_key = "MLFLOW_TRACKING_URI"

if mlflow_tracking_key in os.environ:
    target_uri = os.environ[mlflow_tracking_key]
else:
    ws = ml_client.workspaces.get(name=workspace)
    target_uri = ws.mlflow_tracking_uri
    mlflow.set_tracking_uri(target_uri)

In [None]:
from mlflow.deployments import get_deploy_client

deployment_client = get_deploy_client(target_uri)

In [None]:
endpoint = deployment_client.create_endpoint(endpoint_name)

In [None]:
scoring_uri = deployment_client.get_endpoint(endpoint=endpoint_name)["properties"][
    "scoringUri"
]
print(scoring_uri)

### Create a deployment

In [None]:
deployment_name = "default"

In [None]:
deploy_config = {
    "instance_type": "Standard_DS4_v2",
    "instance_count": 1,
    "readiness_probe": {"initial_delay": 60, "timeout": 5},
}

In [None]:
import json

deployment_config_path = "deployment_config.json"
with open(deployment_config_path, "w") as outfile:
    outfile.write(json.dumps(deploy_config))

In [None]:
f"models:/{model_name}/{model_version}"

In [None]:
deployment = deployment_client.create_deployment(
    name=deployment_name,
    endpoint=endpoint_name,
    model_uri=f"models:/{model_name}/{model_version}",
    config={"deploy-config-file": deployment_config_path},
)

In [None]:
traffic_config = {"traffic": {deployment_name: 100}}

In [None]:
traffic_config_path = "traffic_config.json"
with open(traffic_config_path, "w") as outfile:
    outfile.write(json.dumps(traffic_config))

In [None]:
deployment_client.update_endpoint(
    endpoint=endpoint_name,
    config={"endpoint-config-file": traffic_config_path},
)

In [None]:
endpoint_secret_key = ml_client.online_endpoints.get_keys(
    name=endpoint_name
).access_token

Now let's wrap the registered endpoint and register it as a model in azureml:

In [None]:
from ml_wrappers.model import EndpointWrapperModel

extra_headers = {"azureml-model-deployment": "default"}
endpoint_wrapper = EndpointWrapperModel(
    endpoint_secret_key,
    scoring_uri,
    extra_headers,
    transform_output_dict=True,
    class_names=["LABEL_0", "LABEL_1"],
    wrap_input_data_dict=True,
)

In [None]:
endpoint_wrapper.predict(test_df.iloc[0:2]["text"].tolist())

Now we can register the endpoint wrapper using mlflow as another model:

In [None]:
endpoint_model_name = "wrapped_endpoint_" + model_name
expected_model_id = f"{endpoint_model_name}:{model_version}"
azureml_model_id = f"azureml:{expected_model_id}"
mlflow.pyfunc.log_model(
    python_model=endpoint_wrapper,
    registered_model_name=endpoint_model_name,
    artifact_path=endpoint_model_name,
    pip_requirements=["ml-wrappers"],
)

## Creating the RAI Text Insights

Now that we have our model, we can generate RAI Text insights for it.
Next, we load the RAI components, so that we can construct a pipeline:

In [None]:
blbooksgenre_test_mltable = Input(
    type="mltable",
    path=f"{input_test_data}:{rai_example_version_string}",
    mode="download",
)

registry_name = "azureml"
credential = DefaultAzureCredential()

ml_client_registry = MLClient(
    credential=credential,
    subscription_id=ml_client.subscription_id,
    resource_group_name=ml_client.resource_group_name,
    registry_name=registry_name,
)

rai_text_insights_component = ml_client_registry.components.get(
    name="rai_text_insights", version=version_string
)

We can now specify our pipeline. Complex objects (such as lists of column names) have to be converted to JSON strings before being passed to the components.

In [None]:
import json
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes


@dsl.pipeline(
    compute=compute_name,
    description="Example RAI computation on blbooksgenre data",
    experiment_name=f"RAI_blbooksgenre_Example_RAIInsights_Computation_{model_name_suffix}",
)
def rai_blbooksgenre_text_classification_pipeline(
    target_column_name,
    test_data,
    classes,
    use_model_dependency,
):
    # Initiate the RAIInsights
    rai_text_job = rai_text_insights_component(
        task_type="text_classification",
        model_info=expected_model_id,
        model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),
        test_dataset=test_data,
        target_column_name=target_column_name,
        classes=classes,
        use_model_dependency=use_model_dependency,
    )
    rai_text_job.set_limits(timeout=7200)

    rai_text_job.outputs.dashboard.mode = "upload"
    rai_text_job.outputs.ux_json.mode = "upload"

    return {
        "dashboard": rai_text_job.outputs.dashboard,
        "ux_json": rai_text_job.outputs.ux_json,
    }

Next, we define the pipeline object itself, and ensure that the outputs will be available for download:

In [None]:
import uuid
from azure.ai.ml import Output

insights_pipeline_job = rai_blbooksgenre_text_classification_pipeline(
    target_column_name=target_column_name,
    test_data=blbooksgenre_test_mltable,
    classes="[]",
    use_model_dependency=True,
)

rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",
)

And submit the pipeline to AzureML for execution:

In [None]:
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

The dashboard should appear in the AzureML portal in the registered model view. The following cell computes the expected URI:

In [None]:
sub_id = ml_client._operation_scope.subscription_id
rg_name = ml_client._operation_scope.resource_group_name
ws_name = ml_client.workspace_name

expected_uri = f"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}"

print(f"Please visit {expected_uri} to see your analysis")