sdk/python/endpoints/batch/deploy-models/imagenet-classifier/imagenet-classifier-mlflow.ipynb (849 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Use batch deployments for image file processing with MLflow"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The following notebook demonstrates how to use batch endpoints to deploy MLflow models that work with images. Particularly, we are going to deploy a TensorFlow model for the popular ImageNet classification problem. The MLflow models will be able to receive any image input and do all the require pre-post processing to generate the predictions in the shape and format needed."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook requires:\n",
"\n",
"- `tensorflow`\n",
"- `tensorflow_hub`\n",
"- `pillow`\n",
"- `azure-ai-ml`\n",
"- `azureml-mlflow`\n",
"- `pandas`\n",
"- `scipy`"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Connect to Azure Machine Learning Workspace\n",
"\n",
"The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.\n",
"\n",
"### 1.1. Import the required libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azure.ai.ml import MLClient, Input\n",
"from azure.ai.ml.entities import (\n",
" BatchEndpoint,\n",
" ModelBatchDeployment,\n",
" ModelBatchDeploymentSettings,\n",
" Model,\n",
" AmlCompute,\n",
" Data,\n",
" BatchRetrySettings,\n",
" CodeConfiguration,\n",
" Environment,\n",
")\n",
"from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction\n",
"from azure.identity import DefaultAzureCredential"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### 1.2. Configure workspace details and get a handle to the workspace\n",
"\n",
"To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the `MLClient` from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace. We use the default [default azure authentication](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for this tutorial. Check the [configuration notebook](../../jobs/configuration.ipynb) for more details on how to configure credentials and connect to a workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# enter details of your AML workspace\n",
"subscription_id = \"<SUBSCRIPTION_ID>\"\n",
"resource_group = \"<RESOURCE_GROUP>\"\n",
"workspace = \"<AML_WORKSPACE_NAME>\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"ml_client = MLClient(\n",
" DefaultAzureCredential(), subscription_id, resource_group, workspace\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"If you are working in a Azure Machine Learning compute, you can simply:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ml_client = MLClient.from_config(DefaultAzureCredential())"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## 2. Using MLflow with images\n",
"\n",
"When working with MLflow models that processes images, it is important to take into account that you won't be providing an scoring script. Hence, any data transformation that needs to be done before actually running the classifier needs to be done inside the model itself. Fortunately, you can design models that can compute these transformations:\n",
"\n",
"### 2.1 Creating an MLflow model for image classification\n",
"\n",
"The following example shows how to create a TensorFlow model that takes images of any size and preprocess them using keras layers."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import tensorflow_hub as hub\n",
"import tensorflow as tf\n",
"\n",
"model = tf.keras.Sequential(\n",
" [\n",
" tf.keras.layers.Resizing(\n",
" 244, 244, interpolation=\"bilinear\", crop_to_aspect_ratio=False\n",
" ),\n",
" tf.keras.layers.Rescaling(1 / 255.0),\n",
" hub.KerasLayer(\n",
" \"https://tfhub.dev/google/imagenet/resnet_v2_101/classification/5\"\n",
" ),\n",
" tf.keras.layers.Softmax(axis=-1),\n",
" ]\n",
")\n",
"model.build([None, None, None, 3])"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Let's save this model in a local folder"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"model_local_path = \"model\"\n",
"model.save(model_local_path)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### 2.2 Adding labels to the model predictions\n",
"\n",
"We are going to include the labels for the predicted class in the directory so we can use them for inference:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"!wget https://azuremlexampledata.blob.core.windows.net/data/imagenet/ImageNetLabels.txt -P model"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### 2.3 Creating a custom model loader for MLflow\n",
"\n",
"Let's create a custom loader for the MLflow model:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"%%writefile code/score-with-mlflow/module_loader.py\n",
"\n",
"import pandas as pd\n",
"import tensorflow as tf\n",
"\n",
"class TfClassifier():\n",
" def __init__(self, model_path: str, labels_path: str):\n",
" import numpy as np\n",
" from tensorflow.keras.models import load_model\n",
" \n",
" self.model = load_model(model_path)\n",
" self.imagenet_labels = np.array(open(labels_path).read().splitlines())\n",
"\n",
" def predict(self, data):\n",
"\n",
" preds = self.model.predict(data)\n",
"\n",
" pred_prob = tf.reduce_max(preds, axis=-1)\n",
" pred_class = tf.argmax(preds, axis=-1)\n",
" pred_label = [self.imagenet_labels[pred] for pred in pred_class]\n",
"\n",
" return pd.DataFrame({\n",
" \"class\": pred_class, \n",
" \"probability\": pred_prob,\n",
" \"label\": pred_label\n",
" })\n",
"\n",
"def _load_pyfunc(data_path: str):\n",
" import os\n",
"\n",
" model_path = os.path.abspath(data_path)\n",
" labels_path = os.path.join(model_path, \"ImageNetLabels.txt\")\n",
"\n",
" return TfClassifier(model_path, labels_path)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.4 Adding a model signature for images"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Indicating a signature for your model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import numpy as np\n",
"import mlflow\n",
"from mlflow.models.signature import ModelSignature\n",
"from mlflow.types.schema import Schema, TensorSpec\n",
"\n",
"input_schema = Schema(\n",
" [\n",
" TensorSpec(np.dtype(np.uint8), (-1, -1, -1, 3)),\n",
" ]\n",
")\n",
"signature = ModelSignature(inputs=input_schema)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Creating the dependencies:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from mlflow.utils.environment import _mlflow_conda_env\n",
"\n",
"custom_env = _mlflow_conda_env(\n",
" additional_conda_deps=None,\n",
" additional_pip_deps=[\"tensorflow\"],\n",
" additional_conda_channels=None,\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Logging the model:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"mlflow_model_path = \"mlflow-model\"\n",
"mlflow.pyfunc.save_model(\n",
" mlflow_model_path,\n",
" data_path=\"model\",\n",
" code_path=[\"code/score-with-mlflow/module_loader.py\"],\n",
" loader_module=\"module_loader\",\n",
" conda_env=custom_env,\n",
" signature=signature,\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.5 Registering the new model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_name = \"imagenet-classifier-mlflow\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ml_client.models.create_or_update(\n",
" Model(\n",
" name=model_name,\n",
" path=mlflow_model_path,\n",
" type=AssetTypes.MLFLOW_MODEL,\n",
" )\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"This new model can be used for batch scoring using batch deployments."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = ml_client.models.get(name=model_name, label=\"latest\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## 3 Create Batch Endpoint\n",
"\n",
"Batch endpoints are endpoints that are used batch inferencing on large volumes of data over a period of time. Batch endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.\n",
"\n",
"To create an online endpoint we will use `BatchEndpoint`. This class allows user to configure the following key aspects:\n",
"- `name` - Name of the endpoint. Needs to be unique at the Azure region level\n",
"- `auth_mode` - The authentication method for the endpoint. Currently only Azure Active Directory (Azure AD) token-based (`aad_token`) authentication is supported. \n",
"- `description`- Description of the endpoint.\n",
"\n",
"### 3.1 Configure the endpoint\n",
"\n",
"First, let's create the endpoint that is going to host the batch deployments. To ensure that our endpoint name is unique, let's create a random suffix to append to it. \n",
"\n",
"> In general, you won't need to use this technique but you will use more meaningful names. Please skip the following cell if your case:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import random\n",
"import string\n",
"\n",
"# Creating a unique endpoint name by including a random suffix\n",
"allowed_chars = string.ascii_lowercase + string.digits\n",
"endpoint_suffix = \"\".join(random.choice(allowed_chars) for x in range(5))\n",
"endpoint_name = \"imagenet-classifier-\" + endpoint_suffix"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's configure the endpoint:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"endpoint = BatchEndpoint(\n",
" name=endpoint_name,\n",
" description=\"An batch service to perform ImageNet image classification\",\n",
" tags={\"input-type\": \"tabular\"},\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3.2 Create the endpoint\n",
"Using the `MLClient` created earlier, we will now create the Endpoint in the workspace. This command will start the endpoint creation and return a confirmation response while the endpoint creation continues."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"ml_client.batch_endpoints.begin_create_or_update(endpoint).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Create a batch deployment\n",
"\n",
"A deployment is a set of resources required for hosting the model that does the actual inferencing."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.1 Creating an scoring script to work with the model\n",
"\n",
"> Scoring scripts are not required for MLflow models."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### 4.2 Creating the compute\n",
"\n",
"Batch deployments can run on any Azure ML compute that already exists in the workspace. That means that multiple batch deployments can share the same compute infrastructure. In this example, we are going to work on an AzureML compute cluster called `cpu-cluster`. Let's verify the compute exists on the workspace or create it otherwise."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"compute_name = \"cpu-cluster\"\n",
"if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):\n",
" compute_cluster = AmlCompute(\n",
" name=compute_name, description=\"amlcompute\", min_instances=0, max_instances=5\n",
" )\n",
" ml_client.begin_create_or_update(compute_cluster).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.3 Creating the environment\n",
"\n",
"Let's create the environment. In our case, our model runs on `Torch`. Azure Machine Learning already has an environment with the required software installed, so we can reutilize this environment."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"> Environments are not required for MLflow models"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3.1 Configure the deployment\n",
"\n",
"We will create a deployment for our endpoint using the `BatchDeployment` class. This class allows user to configure the following key aspects.\n",
"- `name` - Name of the deployment.\n",
"- `endpoint_name` - Name of the endpoint to create the deployment under.\n",
"- `model` - The model to use for the deployment. This value can be either a reference to an existing versioned model in the workspace or an inline model specification.\n",
"- `compute` - Name of the compute target to execute the batch scoring jobs on\n",
"- `instance_count`- The number of nodes to use for each batch scoring job.\t\t1\n",
"- `max_concurrency_per_instance`- The maximum number of parallel scoring_script runs per instance.\n",
"- `mini_batch_size`\t- The number of files the code_configuration.scoring_script can process in one `run()` call.\n",
"- `retry_settings`- Retry settings for scoring each mini batch.\t\t\n",
" - `max_retries`- The maximum number of retries for a failed or timed-out mini batch (default is 3)\n",
" - `timeout`- The timeout in seconds for scoring a mini batch (default is 30)\n",
"- `output_action`- Indicates how the output should be organized in the output file. Allowed values are `append_row` or `summary_only`. Default is `append_row`\n",
"- `output_file_name`- Name of the batch scoring output file. Default is `predictions.csv`\n",
"- `environment_variables`- Dictionary of environment variable name-value pairs to set for each batch scoring job.\n",
"- `logging_level`- The log verbosity level.\tAllowed values are `warning`, `info`, `debug`. Default is `info`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"mlflow_deployment = ModelBatchDeployment(\n",
" name=\"imagenet-classifier-rnet-mlflow\",\n",
" description=\"A ResNetV2 model architecture for performing ImageNet classification in batch stored in MLflow format\",\n",
" endpoint_name=endpoint.name,\n",
" model=model,\n",
" compute=compute_name,\n",
" settings=ModelBatchDeploymentSettings(\n",
" instance_count=2,\n",
" max_concurrency_per_instance=1,\n",
" mini_batch_size=10,\n",
" output_action=BatchDeploymentOutputAction.APPEND_ROW,\n",
" output_file_name=\"predictions.csv\",\n",
" retry_settings=BatchRetrySettings(max_retries=3, timeout=300),\n",
" logging_level=\"info\",\n",
" ),\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.3 Create the deployment\n",
"\n",
"Using the `MLClient` created earlier, we will now create the deployment in the workspace. This command will start the deployment creation and return a confirmation response while the deployment creation continues."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"ml_client.batch_deployments.begin_create_or_update(mlflow_deployment).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's update the default deployment name in the endpoint:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"endpoint = ml_client.batch_endpoints.get(endpoint_name)\n",
"endpoint.defaults.deployment_name = mlflow_deployment.name\n",
"ml_client.batch_endpoints.begin_create_or_update(endpoint).result()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"We can see the endpoint URL as follows:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"The default deployment is {endpoint.defaults.deployment_name}\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"At this point, the deployment is ready to perform predictions."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# 4. Clean up Resources\n",
"Delete endpoint"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"..."
]
}
],
"source": [
"ml_client.batch_endpoints.begin_delete(name=endpoint_name)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10 - SDK V2",
"language": "python",
"name": "python310-sdkv2"
},
"language_info": {
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 0
}