sdk/python/foundation-models/system/inference/summarization/summarization-batch-endpoint.ipynb (549 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summarization Inference using Batch Endpoints\n",
"\n",
"This sample shows how to deploy `summarization` type models to a batch endpoint for inference.\n",
"\n",
"### Task\n",
"`summarization` creates a shorter version of a document or an article that captures all the important information. Along with translation, it is another example of a task that can be formulated as a sequence-to-sequence task. \n",
"`summarization` can be:\n",
"\n",
"* Extractive: extract the most relevant information from a document.\n",
"* Abstractive: generate new text that captures the most relevant information.\n",
"\n",
"### Model\n",
"Models that can perform the `summarization` task are tagged with `task: summarization`. We will use the `sshleifer-distilbart-cnn-12-6` model in this notebook. If you opened this notebook from a specific model card, remember to replace the specific model name. If you don't find a model that suits your scenario or domain, you can discover and [import models from HuggingFace hub](../../import/import_model_into_registry.ipynb) and then use them for inference. \n",
"\n",
"### Inference data\n",
"We will use the [CNN DailyMail](https://huggingface.co/datasets/cnn_dailymail) dataset.\n",
"\n",
"### Outline\n",
"* Set up pre-requisites.\n",
"* Pick a model to deploy.\n",
"* Prepare data for inference. \n",
"* Deploy the model for batch inference.\n",
"* Run a batch inference job.\n",
"* Review inference predictions.\n",
"* Clean up resources."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1. Set up pre-requisites\n",
"* Install dependencies\n",
"* Connect to AzureML Workspace. Learn more at [set up SDK authentication](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication?tabs=sdk). Replace `<WORKSPACE_NAME>`, `<RESOURCE_GROUP>` and `<SUBSCRIPTION_ID>` below.\n",
"* Connect to `azureml` system registry.\n",
"* Create or update compute."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Import packages used by the following code snippets\n",
"import csv\n",
"import os\n",
"import time\n",
"\n",
"import datasets\n",
"import pandas as pd\n",
"\n",
"from azure.ai.ml import Input, MLClient\n",
"from azure.ai.ml.constants import AssetTypes\n",
"from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n",
"from azure.ai.ml.entities import (\n",
" AmlCompute,\n",
" BatchDeployment,\n",
" BatchEndpoint,\n",
" BatchRetrySettings,\n",
" Model,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"subscription_id = \"<SUBSCRIPTION_ID>\"\n",
"resource_group_name = \"<RESOURCE_GROUP>\"\n",
"workspace_name = \"<WORKSPACE_NAME>\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Connect to workspace and registry using ML clients."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" credential = DefaultAzureCredential()\n",
" credential.get_token(\"https://management.azure.com/.default\")\n",
"except Exception as ex:\n",
" credential = InteractiveBrowserCredential()\n",
"\n",
"workspace_ml_client = MLClient(\n",
" credential,\n",
" subscription_id=subscription_id,\n",
" resource_group_name=resource_group_name,\n",
" workspace_name=workspace_name,\n",
")\n",
"# The models, fine tuning pipelines, and environments are available in the AzureML system registry, \"azureml\"\n",
"registry_ml_client = MLClient(credential, registry_name=\"azureml\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create a compute cluster.\n",
"Use the model card from the AzureML system registry to check the minimum required inferencing SKU, referenced as `size` below. If you already have a sufficient compute cluster, you can simply define the name in `compute_name` in the following code block."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"compute_name = \"cpu-cluster\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"compute_cluster = AmlCompute(\n",
" name=compute_name,\n",
" description=\"An AML compute cluster\",\n",
" size=\"Standard_DS3_V2\",\n",
" min_instances=0,\n",
" max_instances=3,\n",
" idle_time_before_scale_down=120,\n",
") # 120 seconds\n",
"\n",
"workspace_ml_client.begin_create_or_update(compute_cluster)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2. Pick a model to deploy\n",
"\n",
"Browse models in the Model Catalog in the AzureML Studio, filtering by the `summarization` task. In this example, we use the `sshleifer-distilbart-cnn-12-6` model. If you have opened this notebook for a different model, replace the model name and version accordingly. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_name = \"sshleifer-distilbart-cnn-12-6\"\n",
"model_version = \"4\"\n",
"foundation_model = registry_ml_client.models.get(model_name, model_version)\n",
"print(\n",
" f\"Using model name: {foundation_model.name}, version: {foundation_model.version}, id: {foundation_model.id} for inferencing.\"\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Prepare data for inference\n",
"\n",
"We will test with a small subset from the [CNN_DailyMail](https://huggingface.co/datasets/cnn_dailymail) dataset, saving the sample in the `news-dataset` folder. The next few cells show basic data preparation:\n",
"* Download the data.\n",
"* Visualize some data rows.\n",
"* Save the data.\n",
"\n",
"We want this sample to run quickly, so we are using a smaller dataset containing a fraction of the original."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define directories and filenames as variables\n",
"dataset_dir = \"news-dataset\"\n",
"test_datafile = \"test_100.csv\"\n",
"\n",
"batch_dir = \"batch\"\n",
"batch_inputs_dir = os.path.join(batch_dir, \"inputs\")\n",
"batch_input_file = \"batch_input.csv\"\n",
"os.makedirs(dataset_dir, exist_ok=True)\n",
"os.makedirs(batch_dir, exist_ok=True)\n",
"os.makedirs(batch_inputs_dir, exist_ok=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 3.1 Download the data\n",
"We can use the Huggingface `datasets` module to stream in data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"testdata = datasets.load_dataset(\"cnn_dailymail\", \"3.0.0\", split=\"test\", streaming=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_df = pd.DataFrame(data=testdata.take(100))\n",
"test_df.to_csv(os.path.join(\".\", dataset_dir, test_datafile), index=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 3.2 Visualize a few rows of data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.set_option(\n",
" \"display.max_colwidth\", 0\n",
") # Set the max column width to 0 to display the full text\n",
"test_df.head(2)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 3.3 Save the data\n",
"Save the input data to files of smaller batches for testing. The MLflow model's signature specifies the input should be a column named `\"input_string\"`, so rename the column `\"article\"` to `\"input_string\"`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"batch_df = test_df[[\"article\"]].rename(columns={\"article\": \"input_string\"})"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The early versions of this model require the data to be cropped for successful batch endpoint runs. This will be improved in the future version's preprocessing step, making the next line unnecessary."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"batch_df.input_string = batch_df.input_string.apply(lambda x: x[:3000])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Divide this into files of 10 rows each\n",
"batch_size_per_predict = 10\n",
"for i in range(0, len(batch_df), batch_size_per_predict):\n",
" j = i + batch_size_per_predict\n",
" batch_df[i:j].to_csv(\n",
" os.path.join(batch_inputs_dir, str(i) + batch_input_file), quoting=csv.QUOTE_ALL\n",
" )\n",
"\n",
"# Check out the first and last file name created\n",
"input_files = os.listdir(batch_inputs_dir)\n",
"print(f\"{input_files[0]} to {str(i)}{batch_input_file}.\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Deploy the model to a batch endpoint\n",
"Batch endpoints are endpoints that are used to do batch inferencing on large volumes of data over a period of time. The endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis. For more information on batch endpoints and deployments see [What are batch endpoints?](https://learn.microsoft.com/en-us/azure/machine-learning/concept-endpoints?view=azureml-api-2#what-are-batch-endpoints).\n",
"\n",
"* Create a batch endpoint.\n",
"* Create a batch deployment.\n",
"* Set the deployment as default; doing so allows invoking the endpoint without specifying the deployment's name.\n",
"\n",
"#### Create the endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Endpoint names need to be unique in a region, hence using timestamp to create unique endpoint name\n",
"timestamp = int(time.time())\n",
"endpoint_name = \"summarization-\" + str(timestamp)\n",
"\n",
"endpoint = BatchEndpoint(\n",
" name=endpoint_name,\n",
" description=\"Batch endpoint for \"\n",
" + foundation_model.name\n",
" + \", for summarization task\",\n",
")\n",
"workspace_ml_client.begin_create_or_update(endpoint).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create the deployment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"deployment_name = \"demo\"\n",
"\n",
"deployment = BatchDeployment(\n",
" name=deployment_name,\n",
" endpoint_name=endpoint_name,\n",
" model=foundation_model.id,\n",
" compute=compute_name,\n",
" error_threshold=0,\n",
" instance_count=1,\n",
" logging_level=\"info\",\n",
" max_concurrency_per_instance=1,\n",
" mini_batch_size=5,\n",
" output_file_name=\"predictions.csv\",\n",
" retry_settings=BatchRetrySettings(max_retries=3, timeout=300),\n",
")\n",
"workspace_ml_client.begin_create_or_update(deployment).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Set the deployment as default."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"endpoint = workspace_ml_client.batch_endpoints.get(endpoint_name)\n",
"endpoint.defaults.deployment_name = deployment_name\n",
"workspace_ml_client.begin_create_or_update(endpoint).wait()\n",
"\n",
"endpoint = workspace_ml_client.batch_endpoints.get(endpoint_name)\n",
"print(f\"The default deployment is {endpoint.defaults.deployment_name}\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Run a batch inference job\n",
"Invoke the batch endpoint with the input parameter pointing to the folder containing the batch inference input. This creates a pipeline job using the default deployment in the endpoint. Wait for the job to complete."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"input = Input(path=batch_inputs_dir, type=AssetTypes.URI_FOLDER)\n",
"\n",
"job = workspace_ml_client.batch_endpoints.invoke(\n",
" endpoint_name=endpoint.name, input=input\n",
")\n",
"\n",
"workspace_ml_client.jobs.stream(job.name)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 6. Review inference predictions\n",
"Download the predictions from the job output and review the predictions using a dataframe."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scoring_job = list(workspace_ml_client.jobs.list(parent_job_name=job.name))[0]\n",
"\n",
"workspace_ml_client.jobs.download(\n",
" name=scoring_job.name, download_path=batch_dir, output_name=\"score\"\n",
")\n",
"\n",
"predictions_file = os.path.join(batch_dir, \"named-outputs\", \"score\", \"predictions.csv\")\n",
"\n",
"# Load the batch predictions file with no headers into a dataframe and set your column names\n",
"score_df = pd.read_csv(\n",
" predictions_file,\n",
" header=None,\n",
" names=[\"row_number_per_file\", \"prediction\", \"batch_input_file_name\"],\n",
")\n",
"score_df.head()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Record the input file name and set the original index value in the `'index'` column for each input file. Join the `test_df` containing ground truth into the input dataframe."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"input_df = []\n",
"for file in input_files:\n",
" input = pd.read_csv(os.path.join(batch_inputs_dir, file), index_col=0)\n",
" input.reset_index(inplace=True)\n",
" input[\"batch_input_file_name\"] = file\n",
" input.reset_index(names=[\"row_number_per_file\"], inplace=True)\n",
" input_df.append(input)\n",
"input_df = pd.concat(input_df)\n",
"input_df.set_index(\"index\", inplace=True)\n",
"input_df = input_df.join(test_df).drop(columns=[\"input_string\"])\n",
"\n",
"input_df.head(2)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Join the predictions with input data to compare them to ground truth."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = pd.merge(\n",
" input_df, score_df, how=\"inner\", on=[\"row_number_per_file\", \"batch_input_file_name\"]\n",
")\n",
"\n",
"# Show the first few rows of the results\n",
"df.head(10)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 7. Clean up resources\n",
"Batch endpoints use compute resources only when jobs are submitted. You can keep the batch endpoint for your reference without worrying about compute bills, or choose to delete the endpoint. If you created your compute cluster to have zero minimum instances and scale down soon after being idle, you won't be charged for an unused compute."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"workspace_ml_client.batch_endpoints.begin_delete(name=endpoint_name).result()\n",
"workspace_ml_client.compute.begin_delete(name=compute_name).result()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
},
"vscode": {
"interpreter": {
"hash": "2f394aca7ca06fed1e6064aef884364492d7cdda3614a461e02e6407fc40ba69"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}