sdk/python/responsible-ai/text/responsibleaidashboard-text-classification-financial-news/responsibleaidashboard-text-classification-financial-news.ipynb (1,809 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Binary Text Classification scenario with RAI Dashboard\n",
"\n",
"This notebook demonstrates the use of the `responsibleai` API to binary text classification scenario end to end where a [Huggingface model](https://huggingface.co/docs/transformers/tasks/sequence_classification) will be trained on Fabricated Financial News dataset. The model predicts **the category a Financial News article will fall under.** There are 7 Categories of the Financial news:\n",
"\n",
"1. Banking and finance - Debt Market\n",
"2. Business\n",
"3. Cryptocurrency\n",
"4. Financial Regulations\n",
"5. Personal Finance\n",
"6. Real Estate\n",
"7. Stock Market Updates\n",
"\n",
"The Data Dictionary can be accessed through the following link: [Data_dictionary_Finance](link-URL)\n",
"\n",
"The Notebook walks through the API calls necessary to create a widget with model analysis insights, then guides a visual analysis of the model. The Notebook uses [Responsibleai_text Toolbox](https://github.com/microsoft/responsible-ai-toolbox/tree/main/responsibleai_text) to generate the dashboard.\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## **Installation** \n",
"\n",
"If you are **running the notebook for the first time**, you need to follow a few of steps for smooth execution of notebook:\n",
"\n",
"1. Un-comment the 2 cells below.\n",
"2. Run the 2 cells.\n",
"3. After execution of these cells, comment the cells.\n",
"4. Re-start the kernel\n",
"5. Continue with running of all cells.\n",
"\n",
"\n",
"**Reminder** -- Be sure to set your kernel to \"Python 3.10 - SDK v2,\" via the drop-down menu at the right end of the taskbar. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install Required dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Make sure it comment the below cell while executing the notebook more than once**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689753917169
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"%pip install azure-ai-ml\n",
"%pip install raiutils\n",
"%pip install azureml-rai-utils\n",
"\n",
"%pip install datasets\n",
"%pip install \"pandas<2.0.0\"\n",
"%pip install scikit-learn\n",
"%pip install mltable\n",
"%pip install azure.ai.ml\n",
"%pip install azure.identity\n",
"%pip install mltable\n",
"%pip install transformers\n",
"%pip install torch\n",
"%pip install openpyxl"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"First, it is imperative to define the versions of the Responsible AI (RAI) components accessible within the workspace. These specifications were explicitly indicated during the upload of the components."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617338773
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"version_string = \"0.0.20\""
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Furthermore, it is essential to provide the designation of the compute cluster desired for utilization in AzureML. Subsequently, in this notebook, we will generate the cluster if it does not currently exist."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617338942
}
},
"outputs": [],
"source": [
"compute_name = \"raitextcluster\""
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Lastly, we must stipulate a version for the data and components that will be generated during the execution of this notebook. This version should be exclusive to the workspace, and its actual value holds no significance, as long as it is unique."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617339129
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"rai_example_version_string = \"21\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617355566
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"\n",
"# import datasets\n",
"import pandas as pd\n",
"\n",
"from sklearn import preprocessing\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.preprocessing import LabelEncoder\n",
"\n",
"import zipfile\n",
"from io import BytesIO\n",
"import requests"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Accessing the Data\n",
"\n",
"The following section examines the code necessary to read datasets and a model using components in AzureML.\n",
"\n",
"**Note:** It is advisable to keep the data file and the notebook in same folder. In case they are kept in separate folders, update the path to the dataset "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617357016
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Load the labeled dataset from Excel\n",
"\n",
"# Download the blob data from the provided URL\n",
"data_location = \"https://publictestdatasets.blob.core.windows.net/data/RAI_fabricated_text_classification_data.zip\"\n",
"response = requests.get(data_location)\n",
"blob_content = response.content\n",
"\n",
"with zipfile.ZipFile(BytesIO(blob_content), \"r\") as zip_ref:\n",
" file_list = zip_ref.namelist()\n",
" if len(file_list) > 0:\n",
" # Assume the first file in the zip contains the data\n",
" inner_blob_name = file_list[0]\n",
" inner_blob_content = zip_ref.read(inner_blob_name)\n",
" df = pd.read_excel(BytesIO(inner_blob_content))\n",
"\n",
"# df = pd.read_excel(\"./Text_classification_dataset.xlsx\")\n",
"df = df[[\"Article Description\", \"Category\"]]\n",
"df.columns = [\"text\", \"label\"]\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Validating the the category names and the number of categories"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617357475
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"list(df[\"label\"].unique())"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Splitting the Data into train and test datasets"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617357308
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"train_data, test_data = train_test_split(\n",
" df, test_size=0.20, random_state=0, stratify=df[\"label\"]\n",
")\n",
"test_data"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Saving test & train files\n",
"With the data now split into 'train' and 'test' DataFrames, we save them as parquet files in preparation for upload into AzureML."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_pq_filename = \"Financial_news_train_data.parquet\"\n",
"train_data_folder = \"./data_news_classification/train/\"\n",
"train_data_path = train_data_folder + train_pq_filename\n",
"\n",
"os.makedirs(train_data_folder, exist_ok=True)\n",
"train_data.to_parquet(train_data_path, index=False)\n",
"train_df = pd.read_parquet(train_data_path)\n",
"train_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617362536
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"test_pq_filename = \"Financial_news_test_data.parquet\"\n",
"test_data_folder = \"./data_news_classification/test/\"\n",
"test_data_path = test_data_folder + test_pq_filename\n",
"\n",
"os.makedirs(test_data_folder, exist_ok=True)\n",
"test_data.to_parquet(test_data_path, index=False)\n",
"test_df = pd.read_parquet(test_data_path)\n",
"test_df"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Get the Data to AzureML\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"We are going to create two Data assets in AzureML, one for the train and another for the test. The first step is to create an `MLClient` to perform interactions with AzureML:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Enter details of your AML workspace\n",
"subscription_id = \"<SUBSCRIPTION_ID>\"\n",
"resource_group = \"<RESOURCE_GROUP>\"\n",
"workspace = \"<AML_WORKSPACE_NAME>\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Handle to the workspace\n",
"from azure.ai.ml import MLClient\n",
"from azure.identity import DefaultAzureCredential\n",
"\n",
"credential = DefaultAzureCredential()\n",
"ml_client = MLClient(\n",
" credential=credential,\n",
" subscription_id=subscription_id,\n",
" resource_group_name=resource_group,\n",
" workspace_name=workspace,\n",
")\n",
"print(ml_client)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Create a data asset (URI file) to register the Data into workspace\n",
"This is essential, as the dashboard recognizes only registered assets. \n",
"\n",
"Reference:\n",
"https://learn.microsoft.com/en-us/azure/machine-learning/how-to-create-data-assets?tabs=Python-SDK"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"**Note:** Change the asset name of the below file if the train/test data has changed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617362318
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import Data\n",
"from azure.ai.ml.constants import AssetTypes\n",
"\n",
"input_train_data = \"Financial_News_train_URI_file\"\n",
"\n",
"try:\n",
" # Try getting data already registered in workspace\n",
" train_data = ml_client.data.get(\n",
" name=input_train_data,\n",
" version=rai_example_version_string,\n",
" )\n",
"\n",
"except Exception as e:\n",
" train_data = Data(\n",
" path=train_data_path,\n",
" type=AssetTypes.URI_FILE,\n",
" description=\"RAI News article train data URI File\",\n",
" name=input_train_data,\n",
" version=rai_example_version_string,\n",
" )\n",
" ml_client.data.create_or_update(train_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617363347
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import Data\n",
"from azure.ai.ml.constants import AssetTypes\n",
"\n",
"input_test_data = \"Financial_News_test_URI_file\"\n",
"\n",
"try:\n",
" # Try getting data already registered in workspace\n",
" test_data = ml_client.data.get(\n",
" name=input_test_data,\n",
" version=rai_example_version_string,\n",
" )\n",
"\n",
"except Exception as e:\n",
" test_data = Data(\n",
" path=test_data_path,\n",
" type=AssetTypes.URI_FILE,\n",
" description=\"RAI News article test data URI File\",\n",
" name=input_test_data,\n",
" version=rai_example_version_string,\n",
" )\n",
" ml_client.data.create_or_update(test_data)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Creating the training script RAI Dashboard"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## A model training pipeline\n",
"\n",
"To simplify the model creation process, we're going to use a pipeline. This will have two stages:\n",
"\n",
"1. The actual training component\n",
"2. A model registration component\n",
"\n",
"We have to register the model in AzureML in order for our RAI text insights components to use it. In this notebook we will be training and registering the model in a single pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"\n",
"\n",
"We start by creating a directory to hold the component source:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617363492
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.makedirs(\"Text_classification_component_src\", exist_ok=True)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### The Training Component\n",
"\n",
"This Python notebook contains code for a text classification pipeline that uses the BERT model for fine-tuning and predicting the labels of financial news articles. The pipeline is implemented as a Python script and will be run to create a model, fine-tune it, and register it in Azure ML.\n",
"\n",
"#### Pipeline Steps\n",
"\n",
"1. **Data Loading**: The pipeline starts by loading the financial news dataset from a parquet file.\n",
"\n",
"2. **Data Preprocessing**: The text column containing the news articles' text is used for training. The target column name (containing the labels) is specified as an argument.\n",
"\n",
"3. **Tokenization**: The text data is tokenized using the [BERT tokenizer](https://huggingface.co/docs/transformers/main_classes/tokenizer). The input texts are converted into input encodings and attention masks to feed into the BERT model.\n",
"\n",
"4. **Model Fine-Tuning**: The [BERT model](https://huggingface.co/bert-base-uncased) is loaded, and its last layer is fine-tuned using the training data. The number of epochs and learning rate are configurable.\n",
"\n",
"5. **Prediction Pipeline**: After fine-tuning, a prediction pipeline is built using the transformers pipeline object. This pipeline will allow us to make predictions on new text data. It also saves the trained model to a specified output path using MLFlow. \n",
"\n",
"6. **Model Registration**: The trained model is registered in Azure ML with a specific model name and a suffix based on the current timestamp.\n",
"\n",
"7. **JSON Output**: A JSON file is written, containing information about the registered model, such as its ID and version, and saved in a specified directory.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Huggingface Wrapper for Text Classification Model\n",
"\n",
"The `HuggingfaceWrapper` class is a Python model wrapper designed to work with the [Hugging Face](https://huggingface.co/) library. It enables easy integration of Hugging Face's text classification models into Azure ML pipelines using the `mlflow.pyfunc.PythonModel` base class.\n",
"\n",
"\n",
"#### Purpose\n",
"\n",
"**This Wrapper helps wrapping the model in *Pyfunc* flavour while it is logged via MLFLow.**\n",
"\n",
"The purpose of this wrapper class is to provide a convenient interface to make predictions and inference using Hugging Face's text classification pipelines. It encapsulates the functionality of SHAP (SHapley Additive exPlanations) and ensures that SHAP is installed before using it for explanation purposes.\n",
"\n",
"#### Class Methods\n",
"\n",
"**`__init__(self, pipeline)`**\n",
"\n",
"The constructor initializes the `HuggingfaceWrapper` object with a text classification pipeline provided as an argument. If SHAP is not installed, it raises an `ImportError` and informs the user to install it to use SHAP for explanations.\n",
"\n",
"**`predict(self, context, model_input=None)`**\n",
"\n",
"This method is used to make predictions using the wrapped text classification model. It takes the context and an optional `model_input` argument. If `model_input` is not provided, it uses the `context` parameter for inference. The method returns an array containing the indices of the highest probability predictions for each input.\n",
"\n",
"**`predict_proba(self, dataset)`**\n",
"\n",
"The `predict_proba` method returns the predictions probabilities (scores) for a given dataset using the wrapped model."
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Dependencies\n",
"\n",
"This script requires the following libraries and packages to be installed:\n",
"\n",
"- azureml-core\n",
"- torch\n",
"- transformers\n",
"- mlflow\n",
"- pandas\n",
"- numpy\n",
"- sklearn\n",
"\n",
"Make sure to have these packages installed before running the pipeline script.\n",
"\n",
"Let's proceed with running the pipeline and registering the model in Azure ML."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"%%writefile Text_classification_component_src/training_script.py\n",
"\n",
"import argparse\n",
"import os, pathlib\n",
"import shutil\n",
"import tempfile\n",
"import time\n",
"import pandas as pd\n",
"import pickle\n",
"import json\n",
"import numpy as np\n",
"\n",
"from azureml.core import Run\n",
"\n",
"import torch\n",
"from sklearn.preprocessing import LabelEncoder\n",
"\n",
"import mlflow\n",
"import mlflow.pyfunc\n",
"from transformers import pipeline, AdamW\n",
"from transformers import BertTokenizer, BertForSequenceClassification\n",
"from transformers.models.auto import AutoConfig\n",
"\n",
"from ml_wrappers import wrap_model\n",
"from ml_wrappers.common.warnings_suppressor import shap_warnings_suppressor\n",
"from ml_wrappers.common.constants import ModelTask\n",
"\n",
"import typing\n",
"\n",
"with shap_warnings_suppressor():\n",
" try:\n",
" from shap import models\n",
" shap_installed = True\n",
" except BaseException:\n",
" shap_installed = False\n",
"\n",
"text_column = \"text\"\n",
"\n",
"def parse_args():\n",
" # setup arg parser\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" # add arguments\n",
" parser.add_argument(\"--training_data\", type=str, help=\"Path to training data\")\n",
" parser.add_argument(\"--target_column_name\", type=str, help=\"Name of target column\")\n",
" parser.add_argument(\"--model_info_output_path\", type=str, help=\"Path to write model info JSON\")\n",
" parser.add_argument(\"--model_base_name\", type=str, help=\"Name of the registered model\")\n",
" parser.add_argument(\"--model_name_suffix\", type=int, help=\"Set negative to use epoch_secs\")\n",
" # parse args\n",
" args = parser.parse_args()\n",
"\n",
" # return args\n",
" return args\n",
"\n",
"class HuggingfaceWrapper(mlflow.pyfunc.PythonModel):\n",
" def __init__(self, pipeline):\n",
" self._model = self\n",
"\n",
" if not shap_installed:\n",
" raise ImportError(\"SHAP is not installed. Please install it \" +\n",
" \"to use WrappedTextClassificationModel.\")\n",
" self._wrapped_model = models.TransformersPipeline(pipeline)\n",
"\n",
" def predict(self, context, model_input=None):\n",
" if model_input is None:\n",
" model_input = context # resolve positional inputs if only one arg passed\n",
" \n",
" # get all the probabilities\n",
" scores = self.predict_proba(model_input)\n",
"\n",
" # find index of highest probability for each prediction\n",
" scores_list = scores.tolist() \n",
" indices =[pred.index(max(pred)) for pred in scores_list] \n",
" \n",
" return np.array(indices)\n",
"\n",
" def predict_proba(self, dataset):\n",
" return self._wrapped_model(dataset)\n",
"\n",
"def main(args):\n",
" current_experiment = Run.get_context().experiment\n",
" tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()\n",
" print(\"tracking_uri: {0}\".format(tracking_uri))\n",
" mlflow.set_tracking_uri(tracking_uri)\n",
" mlflow.set_experiment(current_experiment.name)\n",
"\n",
" model_name = 'bert-base-uncased'\n",
" text_column_name = 'text'\n",
"\n",
" # Read in data\n",
" print(\"Loading financial news dataset\")\n",
" df = pd.read_parquet(args.training_data)\n",
" print(df.head(5))\n",
"\n",
" # Preparing train Data\n",
" label_encoder = LabelEncoder()\n",
" df['encoded_label'] = label_encoder.fit_transform(df[args.target_column_name])\n",
" train_df= df[[text_column_name , 'encoded_label']]\n",
"\n",
" # Generating encodings\n",
" label2id_encodings = {l: i for (i, l) in enumerate(label_encoder.fit(df[args.target_column_name]).classes_)}\n",
" id2label_encodings = {v: k for k, v in label2id_encodings.items()}\n",
"\n",
" # Fine Tuning\n",
" print(\"Loading the model and tokenizer\")\n",
" ## Step 1: Load the pre-trained model and tokenizer\n",
" model = BertForSequenceClassification.from_pretrained(\n",
" model_name,\n",
" num_labels=len(train_df['encoded_label'].unique()),\n",
" id2label=id2label_encodings,\n",
" label2id=label2id_encodings,\n",
" )\n",
" tokenizer = BertTokenizer.from_pretrained(\n",
" model_name,\n",
" )\n",
" \n",
" print(\"Prepare the text data\")\n",
" ## Step 2: Prepare your data\n",
" train_texts = train_df[text_column_name].tolist() # List of training texts\n",
" train_labels = train_df['encoded_label'].tolist() # List of corresponding training labels\n",
"\n",
" ### Tokenize the input texts\n",
" train_encodings = tokenizer(train_texts, truncation=True, padding=True)\n",
"\n",
" ### Convert input encodings and labels to tensors\n",
" train_input_ids = torch.tensor(train_encodings['input_ids'])\n",
" train_attention_masks = torch.tensor(train_encodings['attention_mask'])\n",
" train_labels = torch.tensor(train_labels)\n",
"\n",
" ## Step 3: Fine-tuning last layer BERT\n",
" ### Define training parameters\n",
" num_epochs = 15\n",
" learning_rate = 3e-5\n",
" optimizer = AdamW(model.parameters(), lr=learning_rate)\n",
"\n",
" print(\"Fine-tuning the model\")\n",
" model.train()\n",
" for epoch in range(num_epochs): # Set the desired number of training epochs\n",
" outputs = model(train_input_ids, attention_mask=train_attention_masks, labels=train_labels)\n",
" loss = outputs.loss\n",
" loss.backward()\n",
" optimizer.step()\n",
" optimizer.zero_grad()\n",
"\n",
" ### Switch to Evaluation mode \n",
" _ = model.eval()\n",
"\n",
" # build a pipeline object to do predictions\n",
" print(\"Buildling the prediction pipeline\")\n",
" # Load the trained model using pipeline\n",
" pred = pipeline(\n",
" 'text-classification',\n",
" model=model,\n",
" tokenizer=tokenizer,\n",
" top_k = None,\n",
" device=0 if torch.cuda.is_available() else -1,\n",
" return_all_scores=False # added\n",
" )\n",
"\n",
"\n",
" if args.model_name_suffix < 0:\n",
" suffix = int(time.time())\n",
" else:\n",
" suffix = args.model_name_suffix\n",
" registered_name = \"{0}_{1}\".format(args.model_base_name, suffix)\n",
" print(f\"Registering model as {registered_name}\")\n",
"\n",
" # my_mlflow = PyfuncModel(pred)\n",
" my_mlflow = HuggingfaceWrapper(pred)\n",
" \n",
" if args.model_name_suffix < 0:\n",
" suffix = int(time.time())\n",
" else:\n",
" suffix = args.model_name_suffix\n",
"\n",
" registered_name = \"{0}_{1}\".format(args.model_base_name, suffix)\n",
" print(f\"Registering model as {registered_name}\")\n",
"\n",
" # Saving model with mlflow\n",
" print(\"Saving with mlflow\")\n",
" mlflow.pyfunc.log_model(\n",
" python_model=my_mlflow,\n",
" registered_model_name=registered_name,\n",
" artifact_path=registered_name,\n",
" )\n",
"\n",
" print(\"Writing JSON\")\n",
" dict = {\"id\": \"{0}:1\".format(registered_name)}\n",
" output_path = os.path.join(args.model_info_output_path, \"model_info.json\")\n",
" with open(output_path, \"w\") as of:\n",
" json.dump(dict, fp=of)\n",
"\n",
"\n",
"# run script\n",
"if __name__ == \"__main__\":\n",
" # add space in logs\n",
" print(\"*\" * 60)\n",
" print(\"\\n\\n\")\n",
"\n",
" # parse args\n",
" args = parse_args()\n",
"\n",
" # run main function\n",
" main(args)\n",
"\n",
" # add space in logs\n",
" print(\"*\" * 60)\n",
" print(\"\\n\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Creating an Azure ML Training Component \n",
"\n",
"This code cell demonstrates the creation of an Azure Machine Learning (Azure ML) component for training a model on financial news data. The component is defined using a YAML configuration file and is later loaded into the Azure ML workspace for use in the ML pipeline.\n",
"\n",
"#### Component Configuration\n",
"\n",
"The configuration of the component is stored in the YAML format. It specifies the input and output data types, the Python script's location, and the required environment for execution. The inputs to the component include:\n",
"\n",
"- `training_data`: The path to the training data file in parquet format.\n",
"- `target_column_name`: The name of the target column in the training data that contains the labels.\n",
"- `model_base_name`: The base name to be used for the registered model.\n",
"- `model_name_suffix`: An optional integer suffix for the model name. Set to a negative value to use the current timestamp as the suffix.\n",
"\n",
"The output of the component is:\n",
"\n",
"- `model_info_output_path`: The path to the directory where the model info JSON file will be saved.\n",
"\n",
"#### Command Execution\n",
"\n",
"The YAML configuration specifies a Python script, `training_script.py`, that will be executed with the provided inputs and outputs. The script takes the input arguments, performs the model training, and registers the trained model in the Azure ML workspace.\n",
"\n",
"#### Environment\n",
"\n",
"The component specifies the environment required for execution. It uses the environment named `responsibleai-text-ubuntu20.04-py38-cpu` from the Azure ML environment registry.\n",
"\n",
"Let's proceed with creating the Azure ML component and using it in our ML pipeline.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"**Executing the below cell multiple times require change in names as the training scripts are not editable**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617364099
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml import load_component\n",
"\n",
"yaml_contents = (\n",
" f\"\"\"\n",
"$schema: http://azureml/sdk-2-0/CommandComponent.json\n",
"name: rai_financial_news_training_component\n",
"display_name: Financial News training component pipeline\n",
"version: {rai_example_version_string}\n",
"type: command\n",
"inputs:\n",
" training_data:\n",
" type: path\n",
" target_column_name:\n",
" type: string\n",
" model_base_name:\n",
" type: string\n",
" model_name_suffix: # Set negative to use epoch_secs\n",
" type: integer\n",
" default: -1\n",
"outputs:\n",
" model_info_output_path:\n",
" type: path\n",
"code: ./Text_classification_component_src/\n",
"environment: azureml://registries/azureml/environments/responsibleai-text/versions/13\n",
"\"\"\"\n",
" + r\"\"\"\n",
"command: >-\n",
" python training_script.py\n",
" --training_data ${{{{inputs.training_data}}}}\n",
" --target_column_name ${{{{inputs.target_column_name}}}}\n",
" --model_base_name ${{{{inputs.model_base_name}}}}\n",
" --model_name_suffix ${{{{inputs.model_name_suffix}}}}\n",
" --model_info_output_path ${{{{outputs.model_info_output_path}}}}\n",
"\"\"\"\n",
")\n",
"\n",
"yaml_filename = \"FinacialNewsTrainingComp.yaml\"\n",
"\n",
"with open(yaml_filename, \"w\") as f:\n",
" f.write(yaml_contents.format(yaml_contents))\n",
"\n",
"train_model_component = load_component(source=yaml_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"We need a compute target on which to run our jobs. The following checks whether the compute specified above is present; if not, then the compute target is created."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617366587
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import AmlCompute\n",
"\n",
"all_compute_names = [x.name for x in ml_client.compute.list()]\n",
"\n",
"if compute_name in all_compute_names:\n",
" print(f\"Found existing compute: {compute_name}\")\n",
"else:\n",
" my_compute = AmlCompute(\n",
" name=compute_name,\n",
" size=\"STANDARD_DS4_V2\",\n",
" min_instances=0,\n",
" max_instances=4,\n",
" idle_time_before_scale_down=3600,\n",
" )\n",
" ml_client.compute.begin_create_or_update(my_compute)\n",
" print(\"Initiated compute creation\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Running a training pipeline\n",
"\n",
"Now that we have our training component, we can run it. We begin by generating a unique name for the mode;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617366729
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import time\n",
"\n",
"model_base_name = \"Financial_News_classifier\"\n",
"model_name_suffix = \"12492\"\n",
"device = -1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617367120
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml import dsl, Input\n",
"\n",
"target_column_name = \"label\"\n",
"encoded_classes = json.dumps(list(df[\"label\"].unique()))\n",
"\n",
"News_train_pq = Input(\n",
" type=\"uri_file\",\n",
" path=f\"azureml:{input_train_data}:{rai_example_version_string}\",\n",
" mode=\"download\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Next, we define our training pipeline. This has two components. The first is the training component which we defined above. The second is a component to register the model in AzureML:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617367303
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" compute=compute_name,\n",
" description=\"RAI computation on Financial News Classification\",\n",
" experiment_name=f\"RAI_Financial_News_classification_Computation_{model_name_suffix}\",\n",
")\n",
"def my_training_pipeline(\n",
" target_column_name, training_data, model_base_name, model_name_suffix\n",
"):\n",
" trained_model = train_model_component(\n",
" target_column_name=target_column_name,\n",
" training_data=training_data,\n",
" model_base_name=model_base_name,\n",
" model_name_suffix=model_name_suffix,\n",
" )\n",
" trained_model.set_limits(timeout=1200)\n",
"\n",
" return {}\n",
"\n",
"\n",
"model_registration_pipeline_job = my_training_pipeline(\n",
" target_column_name, News_train_pq, model_base_name, model_name_suffix\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"With the pipeline definition created, we can submit it to AzureML. We define a helper function to do the submission, which waits for the submitted job to complete:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689617367453
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import PipelineJob\n",
"from IPython.core.display import HTML\n",
"from IPython.display import display\n",
"\n",
"\n",
"def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:\n",
" created_job = ml_client.jobs.create_or_update(pipeline_job)\n",
" assert created_job is not None\n",
"\n",
" print(\"Pipeline job can be accessed in the following URL:\")\n",
" display(HTML('<a href=\"{0}\">{0}</a>'.format(created_job.studio_url)))\n",
"\n",
" while created_job.status not in [\n",
" \"Completed\",\n",
" \"Failed\",\n",
" \"Canceled\",\n",
" \"NotResponding\",\n",
" ]:\n",
" time.sleep(30)\n",
" created_job = ml_client.jobs.get(created_job.name)\n",
" print(\"Latest status : {0}\".format(created_job.status))\n",
" assert created_job.status == \"Completed\"\n",
" return created_job"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"With the training pipeline defined, we can submit it for execution in AzureML. We define a helper function to wait for the job to complete:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618345995
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# This is the actual submission\n",
"training_job = submit_and_wait(ml_client, model_registration_pipeline_job)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Creating the RAI Text Insights \n",
"\n",
"Now that we have our model, we can generate RAI Text insights for it. We will need the `id` of the registered model, which will be as follows:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618346789
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"expected_model_id = f\"{model_base_name}_{model_name_suffix}:1\"\n",
"azureml_model_id = f\"azureml:{expected_model_id}\"\n",
"print(expected_model_id)\n",
"print(azureml_model_id)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Next, we load the RAI components, so that we can construct a pipeline:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618347226
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"News_test_pq = Input(\n",
" type=\"uri_file\",\n",
" path=f\"azureml:{input_test_data}:{rai_example_version_string}\",\n",
" mode=\"download\",\n",
")\n",
"\n",
"registry_name = \"azureml\"\n",
"credential = DefaultAzureCredential()\n",
"\n",
"ml_client_registry = MLClient(\n",
" credential=credential,\n",
" subscription_id=ml_client.subscription_id,\n",
" resource_group_name=ml_client.resource_group_name,\n",
" registry_name=registry_name,\n",
")\n",
"\n",
"rai_text_insights_component = ml_client_registry.components.get(\n",
" name=\"rai_text_insights\", version=version_string\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"We can now specify our pipeline. Complex objects (such as lists of column names) have to be converted to JSON strings before being passed to the components."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618347626
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import json\n",
"from azure.ai.ml import Input\n",
"from azure.ai.ml.constants import AssetTypes\n",
"\n",
"\n",
"@dsl.pipeline(\n",
" compute=compute_name,\n",
" description=\"RAI computation for Financial News Classification\",\n",
" experiment_name=f\"RAI_Financial_News_classification_Computation_{model_name_suffix}\",\n",
")\n",
"def rai_text_classification_pipeline(\n",
" target_column_name,\n",
" train_data,\n",
" test_data,\n",
" classes,\n",
" use_model_dependency,\n",
"):\n",
" # Initiate the RAIInsights\n",
" rai_text_job = rai_text_insights_component(\n",
" task_type=\"text_classification\",\n",
" model_info=expected_model_id,\n",
" model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),\n",
" test_dataset=test_data,\n",
" target_column_name=target_column_name,\n",
" classes=classes,\n",
" use_model_dependency=use_model_dependency,\n",
" )\n",
" rai_text_job.set_limits(timeout=6000)\n",
"\n",
" rai_text_job.outputs.dashboard.mode = \"upload\"\n",
" rai_text_job.outputs.ux_json.mode = \"upload\"\n",
"\n",
" return {\n",
" \"dashboard\": rai_text_job.outputs.dashboard,\n",
" \"ux_json\": rai_text_job.outputs.ux_json,\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618348400
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"encoded_classes = json.dumps(list(df[\"label\"].unique()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618348975
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"encoded_classes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618350449
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"target_column_name = \"label\""
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Next, we define the pipeline object itself, and ensure that the outputs will be available for download:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689618350867
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import uuid\n",
"from azure.ai.ml import Output\n",
"\n",
"\n",
"insights_pipeline_job = rai_text_classification_pipeline(\n",
" target_column_name=target_column_name,\n",
" test_data=News_test_pq,\n",
" classes=encoded_classes,\n",
" use_model_dependency=True,\n",
")\n",
"\n",
"rand_path = str(uuid.uuid4())\n",
"insights_pipeline_job.outputs.dashboard = Output(\n",
" path=f\"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/\",\n",
" mode=\"upload\",\n",
" type=\"uri_folder\",\n",
")\n",
"insights_pipeline_job.outputs.ux_json = Output(\n",
" path=f\"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/\",\n",
" mode=\"upload\",\n",
" type=\"uri_folder\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"And submit the pipeline to AzureML for execution:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689333203648
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"insights_job = submit_and_wait(ml_client, insights_pipeline_job)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"The dashboard should appear in the AzureML portal in the registered model view. The following cell computes the expected URI:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1689333204418
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"sub_id = ml_client._operation_scope.subscription_id\n",
"rg_name = ml_client._operation_scope.resource_group_name\n",
"ws_name = ml_client.workspace_name\n",
"\n",
"expected_uri = f\"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}\"\n",
"\n",
"print(f\"Please visit {expected_uri} to see your analysis\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Once this is complete, we can go to the Registered Models view in the AzureML portal, and find the model we have just registered. On the 'Model Details' page, there is a \"Responsible AI dashboard\" tab where we can view the insights which we have just uploaded."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Remove the temporary directories\n",
"from pathlib import Path\n",
"import shutil\n",
"\n",
"comp_dir = Path(\"./Text_classification_component_src\")\n",
"list_dir = [comp_dir]\n",
"\n",
"for dir in list_dir:\n",
" if dir.exists() and dir.is_dir():\n",
" shutil.rmtree(dir)\n",
"\n",
"\n",
"list_file = [\"./FinacialNewsTrainingComp.yaml\"]\n",
"\n",
"for file in list_file:\n",
" if os.path.exists(file):\n",
" os.remove(file)"
]
}
],
"metadata": {
"kernel_info": {
"name": "python310-sdkv2"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
},
"microsoft": {
"host": {
"AzureML": {
"notebookHasBeenCompleted": true
}
},
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}