# Mulitlabel Text Classification scenario with RAI Dashboard

The [Covid19 Emergency Event Dataset](https://huggingface.co/datasets/joelito/covid19_emergency_event) is a multilabel dataset that presents a corpus of legislative documents manually annotated for exceptional measures against COVID-19. Each document has 8 possible labels, where the document can be tagged with up to all 8 labels, and each label representing a specific measurement against COVID-19.  The events are:

event1: State of Emergency
event2: Restrictions of fundamental rights and civil liberties
event3: Restrictions of daily liberties
event4: Closures / lockdown
event5: Suspension of international cooperation and commitments
event6: Police mobilization
event7: Army mobilization
event8: Government oversight



Install datasets to retrieve this dataset from huggingface:

In [None]:
%pip install datasets
%pip install "pandas<2.0.0"

First, we need to specify the version of the RAI components which are available in the workspace. This was specified when the components were uploaded.

In [None]:
version_string = "0.0.20"

We also need to give the name of the compute cluster we want to use in AzureML. Later in this notebook, we will create it if it does not already exist:

In [None]:
compute_name = "cpucluster"

Finally, we need to specify a version for the data and components we will create while running this notebook. This should be unique for the workspace, but the specific value doesn't matter:

In [None]:
rai_example_version_string = "23"

## Accessing the Data

We supply the data as a pair of parquet files and accompanying `MLTable` file. We can download them, preprocess them, and take a brief look:

In [None]:
import os
import json
import datasets
import pandas as pd

from sklearn import preprocessing

NUM_TEST_SAMPLES = 100

In [None]:
def load_covid19_emergency_event_dataset(split):
    dataset = datasets.load_dataset("joelito/covid19_emergency_event", split=split)
    dataset = pd.DataFrame(
        {
            "language": dataset["language"],
            "text": dataset["text"],
            "event1": dataset["event1"],
            "event2": dataset["event2"],
            "event3": dataset["event3"],
            "event4": dataset["event4"],
            "event5": dataset["event5"],
            "event6": dataset["event6"],
            "event7": dataset["event7"],
            "event8": dataset["event8"],
        }
    )
    dataset = dataset[dataset.language == "en"].reset_index(drop=True)
    dataset = dataset.drop(columns="language")
    return dataset


pd_test_data = load_covid19_emergency_event_dataset("test")
test_data = pd_test_data[:NUM_TEST_SAMPLES]

Now create the mltable:

In [None]:
pq_filename = "covid_data.parquet"


def create_ml_table_file_contents(pq_filename):
    return (
        "$schema: http://azureml/sdk-2-0/MLTable.json\n"
        "type: mltable\n"
        "paths:\n"
        " - file: ./{0}\n"
        "transformations:\n"
        " - read_parquet\n"
    ).format(pq_filename)


def write_to_parquet(data, path, pq_filename):
    os.makedirs(path, exist_ok=True)
    data.to_parquet(os.path.join(path, pq_filename), index=False)


def create_ml_table_file(path, contents):
    with open(os.path.join(path, "MLTable"), "w") as f:
        f.write(contents)


test_data_path = "multilabel_covid_test_data"

write_to_parquet(test_data, test_data_path, pq_filename)

mltable_file_contents = create_ml_table_file_contents(pq_filename)
create_ml_table_file(test_data_path, mltable_file_contents)

Load some data for a quick view:

In [None]:
import mltable

tbl = mltable.load(test_data_path)
test_df: pd.DataFrame = tbl.to_pandas_dataframe()

display(test_df)

The label column contains the classes:

In [None]:
target_column_name = [
    "event1",
    "event2",
    "event3",
    "event4",
    "event5",
    "event6",
    "event7",
    "event8",
]
encoded_target_column_name = json.dumps(target_column_name)

First, we need to upload the datasets to our workspace. We start by creating an `MLClient` for interactions with AzureML:

In [None]:
# Enter details of your AML workspace
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

In [None]:
# Handle to the workspace
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

try:
    credential = DefaultAzureCredential()
    ml_client = MLClient(
        credential=credential,
        subscription_id=subscription_id,
        resource_group_name=resource_group,
        workspace_name=workspace,
    )
except Exception:
    # If in compute instance we can get the config automatically
    from azureml.core import Workspace

    workspace = Workspace.from_config()
    workspace.write_config()
    ml_client = MLClient.from_config(
        credential=DefaultAzureCredential(exclude_shared_token_cache_credential=True),
        logging_enable=True,
    )

print(ml_client)

We can now upload the data to AzureML:

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

input_test_data = "Covid19_Events_Test_MLTable"

try:
    test_data = ml_client.data.get(
        name=input_test_data,
        version=rai_example_version_string,
    )
except Exception:
    test_data = Data(
        path=test_data_path,
        type=AssetTypes.MLTABLE,
        description="RAI Covid 19 Events Multilabel test data",
        name=input_test_data,
        version=rai_example_version_string,
    )
    ml_client.data.create_or_update(test_data)

# Creating the Model

To simplify the model creation process, we're going to use a pipeline.

We create a directory for the training script:

In [None]:
import os

os.makedirs("covid19_events_component_src", exist_ok=True)

Next, we write out our script to retrieve the trained model:

In [None]:
%%writefile covid19_events_component_src/training_script.py

import argparse
import logging
import json
import os
import time


import mlflow
import mlflow.pyfunc

import zipfile
from azureml.core import Run

from transformers import AutoModelForSequenceClassification, \
    AutoTokenizer, pipeline

from azureml.rai.utils import PyfuncModel
from raiutils.common.retries import retry_function

try:
    from urllib import urlretrieve
except ImportError:
    from urllib.request import urlretrieve


_logger = logging.getLogger(__file__)
logging.basicConfig(level=logging.INFO)


COVID19_EVENTS_LABELS = ["event1", "event2", "event3", "event4",
                         "event5", "event6", "event7", "event8"]
COVID19_EVENTS_MODEL_NAME = "covid19_events_model"


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--model_output_path", type=str, help="Path to write model info JSON"
    )
    parser.add_argument(
        "--model_base_name", type=str, help="Name of the registered model"
    )
    parser.add_argument(
        "--model_name_suffix", type=int, help="Set negative to use epoch_secs"
    )
    parser.add_argument(
        "--device", type=int, help=(
            "Device for CPU/GPU supports. Setting this to -1 will leverage "
            "CPU, >=0 will run the model on the associated CUDA device id.")
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


class FetchModel(object):
    def __init__(self):
        pass

    def fetch(self):
        zipfilename = COVID19_EVENTS_MODEL_NAME + '.zip'
        url = ('https://publictestdatasets.blob.core.windows.net/models/' +
               COVID19_EVENTS_MODEL_NAME + '.zip')
        urlretrieve(url, zipfilename)
        with zipfile.ZipFile(zipfilename, 'r') as unzip:
            unzip.extractall(COVID19_EVENTS_MODEL_NAME)


def create_multilabel_pipeline(device):
    fetcher = FetchModel()
    action_name = "Model download"
    err_msg = "Failed to download model"
    max_retries = 4
    retry_delay = 60
    retry_function(fetcher.fetch, action_name, err_msg,
                   max_retries=max_retries,
                   retry_delay=retry_delay)
    labels = COVID19_EVENTS_LABELS
    num_labels = len(labels)
    id2label = {idx: label for idx, label in enumerate(labels)}
    label2id = {label: idx for idx, label in enumerate(labels)}
    model = AutoModelForSequenceClassification.from_pretrained(
        COVID19_EVENTS_MODEL_NAME, num_labels=num_labels,
        problem_type="multi_label_classification",
        id2label=id2label,
        label2id=label2id)

    if device >= 0:
        model = model.cuda()

    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
    # build a pipeline object to do predictions
    pred = pipeline(
        "text-classification",
        model=model,
        tokenizer=tokenizer,
        device=device,
        return_all_scores=True
    )
    return pred


def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    _logger.info("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)

    _logger.info("Getting device")
    device = args.device

    # build a pipeline object to do predictions
    _logger.info("Building pipeline")

    pred = create_multilabel_pipeline(device)

    if args.model_name_suffix < 0:
        suffix = int(time.time())
    else:
        suffix = args.model_name_suffix
    registered_name = "{0}_{1}".format(args.model_base_name, suffix)
    _logger.info(f"Registering model as {registered_name}")

    my_mlflow = PyfuncModel(pred)

    # Saving model with mlflow
    _logger.info("Saving with mlflow")
    mlflow.pyfunc.log_model(
        python_model=my_mlflow,
        registered_model_name=registered_name,
        artifact_path=registered_name,
    )

    _logger.info("Writing JSON")
    dict = {"id": "{0}:1".format(registered_name)}
    output_path = os.path.join(args.model_output_path, "model_info.json")
    with open(output_path, "w") as of:
        json.dump(dict, fp=of)


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Now, we can build this into an AzureML component:

In [None]:
from azure.ai.ml import load_component

yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: rai_covid19_events_training_component
display_name: Covid 19 Events training component for RAI example
version: {rai_example_version_string}
type: command
inputs:
  model_base_name:
    type: string
  model_name_suffix: # Set negative to use epoch_secs
    type: integer
    default: -1
  device: # set to >= 0 to use GPU
    type: integer
    default: 0
outputs:
  model_output_path:
    type: path
code: ./covid19_events_component_src/
environment: azureml://registries/azureml/environments/responsibleai-text/versions/13
command: >-
  python training_script.py
  --model_base_name ${{{{inputs.model_base_name}}}}
  --model_name_suffix ${{{{inputs.model_name_suffix}}}}
  --device ${{{{inputs.device}}}}
  --model_output_path ${{{{outputs.model_output_path}}}}
"""

yaml_filename = "Covid19EventsTextTrainingComp.yaml"

with open(yaml_filename, "w") as f:
    f.write(yaml_contents)

train_component_definition = load_component(source=yaml_filename)

ml_client.components.create_or_update(train_component_definition)

We need a compute target on which to run our jobs. The following checks whether the compute specified above is present; if not, then the compute target is created.

In [None]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="STANDARD_DS3_V2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600,
    )
    ml_client.compute.begin_create_or_update(my_compute)
    print("Initiated compute creation")

## Running a training pipeline

Now that we have our training component, we can run it. We begin by generating a unique name for the mode;

In [None]:
import time

model_base_name = "multilabel_hf_model"
model_name_suffix = "12492"
device = -1

Next, we define our training pipeline. This has two components. The first is the training component which we defined above. The second is a component to register the model in AzureML:

In [None]:
from azure.ai.ml import dsl, Input

train_model_component = ml_client.components.get(
    name="rai_covid19_events_training_component", version=rai_example_version_string
)


@dsl.pipeline(
    compute=compute_name,
    description="Register Model for RAI Covid 19 Events Multilabel example",
    experiment_name=f"RAI_Covid19_Events_Multilabel_Example_Model_Training_{model_name_suffix}",
)
def my_training_pipeline(model_base_name, model_name_suffix, device):
    trained_model = train_component_definition(
        model_base_name=model_base_name,
        model_name_suffix=model_name_suffix,
        device=device,
    )
    trained_model.set_limits(timeout=3600)

    return {}


model_registration_pipeline_job = my_training_pipeline(
    model_base_name, model_name_suffix, device
)

With the training pipeline defined, we can submit it for execution in AzureML. We define a helper function to wait for the job to complete:

In [None]:
from azure.ai.ml.entities import PipelineJob


def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in [
        "Completed",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))
    assert created_job.status == "Completed"
    return created_job


# This is the actual submission
training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

## Creating the RAI Text Insights

Now that we have our model, we can generate RAI Text insights for it. We will need the `id` of the registered model, which will be as follows:

In [None]:
expected_model_id = f"{model_base_name}_{model_name_suffix}:1"
azureml_model_id = f"azureml:{expected_model_id}"

Next, we load the RAI components, so that we can construct a pipeline:

In [None]:
covid19_test_mltable = Input(
    type="mltable",
    path=f"{input_test_data}:{rai_example_version_string}",
    mode="download",
)

registry_name = "azureml"
credential = DefaultAzureCredential()

ml_client_registry = MLClient(
    credential=credential,
    subscription_id=ml_client.subscription_id,
    resource_group_name=ml_client.resource_group_name,
    registry_name=registry_name,
)

rai_text_insights_component = ml_client_registry.components.get(
    name="rai_text_insights", version=version_string
)

We can now specify our pipeline. Complex objects (such as lists of column names) have to be converted to JSON strings before being passed to the components.

In [None]:
import json
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes


@dsl.pipeline(
    compute=compute_name,
    description="Example RAI computation on Covid 19 Events Multilabel data",
    experiment_name=f"RAI_Covid19_Events_Multilabel_Example_RAIInsights_Computation_{model_name_suffix}",
)
def rai_covid19_text_classification_pipeline(
    target_column_name,
    test_data,
    classes,
    use_model_dependency,
):
    # Initiate the RAIInsights
    rai_text_job = rai_text_insights_component(
        task_type="multilabel_text_classification",
        model_info=expected_model_id,
        model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),
        test_dataset=test_data,
        target_column_name=target_column_name,
        classes=classes,
        use_model_dependency=use_model_dependency,
    )
    rai_text_job.set_limits(timeout=7200)

    rai_text_job.outputs.dashboard.mode = "upload"
    rai_text_job.outputs.ux_json.mode = "upload"

    return {
        "dashboard": rai_text_job.outputs.dashboard,
        "ux_json": rai_text_job.outputs.ux_json,
    }

Next, we define the pipeline object itself, and ensure that the outputs will be available for download:

In [None]:
import uuid
from azure.ai.ml import Output

insights_pipeline_job = rai_covid19_text_classification_pipeline(
    target_column_name=encoded_target_column_name,
    test_data=covid19_test_mltable,
    classes="[]",
    use_model_dependency=True,
)

rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",
)

And submit the pipeline to AzureML for execution:

In [None]:
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

The dashboard should appear in the AzureML portal in the registered model view. The following cell computes the expected URI:

In [None]:
sub_id = ml_client._operation_scope.subscription_id
rg_name = ml_client._operation_scope.resource_group_name
ws_name = ml_client.workspace_name

expected_uri = f"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}"

print(f"Please visit {expected_uri} to see your analysis")

## Constructing the pipeline in YAML

It is also possible to specify the pipeline as a YAML file, and submit that using the command line. We will now create a YAML specification of the above pipeline and submit that:

In [None]:
yaml_contents = f"""
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
experiment_name: AML_RAI_Multilabel_Text_Sample_{rai_example_version_string}
type: pipeline

compute: azureml:cpucluster

inputs:
  hf_model_info: {expected_model_id}
  my_test_data:
    type: mltable
    path: azureml:{input_test_data}:{rai_example_version_string}
    mode: download

settings:
  default_datastore: azureml:workspaceblobstore
  default_compute: azureml:cpucluster
  continue_on_step_failure: false

jobs:
  analyse_model:
    type: command
    component: azureml://registries/azureml/components/rai_text_insights/versions/{version_string}
    inputs:
      task_type: multilabel_text_classification
      model_input:
        type: mlflow_model
        path: {azureml_model_id}
      model_info: ${{{{parent.inputs.hf_model_info}}}}
      test_dataset: ${{{{parent.inputs.my_test_data}}}}
      target_column_name: {target_column_name}
      maximum_rows_for_test_dataset: 5000
      classes: '[]'
      enable_explanation: True
      enable_error_analysis: True
"""

yaml_pipeline_filename = "rai_text_example.yaml"

with open(yaml_pipeline_filename, "w") as f:
    f.write(yaml_contents)

The created file can then be submitted using the Azure CLI:

In [None]:
cmd_line = [
    "az",
    "ml",
    "job",
    "create",
    "--resource-group",
    rg_name,
    "--workspace",
    ws_name,
    "--file",
    yaml_pipeline_filename,
]

import subprocess

try:
    cmd = subprocess.run(cmd_line, check=True, shell=True, capture_output=True)
except subprocess.CalledProcessError as cpe:
    print(f"Error invoking: {cpe.args}")
    print(cpe.stdout)
    print(cpe.stderr)
    raise
else:
    print("Azure CLI submission completed")