sdk/python/endpoints/batch/deploy-models/heart-classifier-mlflow/mlflow-for-batch-tabular.ipynb (1,073 lines of code) (raw):

{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Use MLflow models in batch deployments\n", "\n", "In this article, learn how to deploy your MLflow model to Azure ML for both batch inference using batch endpoints. Azure Machine Learning supports no-code deployment of models created and logged with MLflow. This means that you don't have to provide a scoring script or an environment.\n", "\n", "For no-code-deployment, Azure Machine Learning\n", "\n", "* Provides a MLflow base image/curated environment that contains the required dependencies to run an Azure Machine Learning Batch job.\n", "* Creates a batch job pipeline with a scoring script for you that can be used to process data using parallelization." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Connect to Azure Machine Learning Workspace\n", "\n", "The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.\n", "\n", "### 1.1. Import the required libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azure.ai.ml import MLClient, Input\n", "from azure.ai.ml.entities import (\n", " BatchEndpoint,\n", " ModelBatchDeployment,\n", " ModelBatchDeploymentSettings,\n", " Model,\n", " AmlCompute,\n", " Data,\n", " BatchRetrySettings,\n", " CodeConfiguration,\n", " Environment,\n", ")\n", "from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction\n", "from azure.identity import DefaultAzureCredential" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2. Configure workspace details and get a handle to the workspace\n", "\n", "To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the `MLClient` from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace. We use the default [default azure authentication](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for this tutorial. Check the [configuration notebook](../../jobs/configuration.ipynb) for more details on how to configure credentials and connect to a workspace." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "subscription_id = \"<SUBSCRIPTION_ID>\"\n", "resource_group = \"<RESOURCE_GROUP>\"\n", "workspace = \"<AML_WORKSPACE_NAME>\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "If you are working in a Azure Machine Learning compute, you can simply:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ml_client = MLClient.from_config(DefaultAzureCredential())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## 2. Registering the model\n", "\n", "### 2.1 About the model\n", "\n", "This example shows how you can deploy an MLflow model to a batch endpoint to perform batch predictions. This example uses an MLflow model based on the UCI Heart Disease Data Set. The database contains 76 attributes, but we are using a subset of 14 of them. The model tries to predict the presence of heart disease in a patient. It is integer valued from 0 (no presence) to 1 (presence).\n", "\n", "The model has been trained using an `XGBBoost` classifier and all the required preprocessing has been packaged as a `scikit-learn` pipeline, making this model an end-to-end pipeline that goes from raw data to predictions.\n", "\n", "### 2.2 Registering the model in the workspace\n", "\n", "Let's verify if the model we want to deploy, `heart-classifier`, is registered in the model registry. If not, we will register it from a local version we have in the repository:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "register_model", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "model_name = \"heart-classifier-mlflow\"\n", "model_local_path = \"model\"\n", "\n", "model = ml_client.models.create_or_update(\n", " Model(name=model_name, path=model_local_path, type=AssetTypes.MLFLOW_MODEL)\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Let's get the model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "get_model", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "model = ml_client.models.get(name=model_name, label=\"latest\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## 3 Create Batch Endpoint\n", "\n", "Batch endpoints are endpoints that are used batch inferencing on large volumes of data over a period of time. Batch endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis.\n", "\n", "To create an online endpoint we will use `BatchEndpoint`. This class allows user to configure the following key aspects:\n", "- `name` - Name of the endpoint. Needs to be unique at the Azure region level\n", "- `auth_mode` - The authentication method for the endpoint. Currently only Azure Active Directory (Azure AD) token-based (`aad_token`) authentication is supported. \n", "- `description`- Description of the endpoint.\n", "\n", "### 3.1 Configure the endpoint\n", "\n", "First, let's create the endpoint that is going to host the batch deployments. To ensure that our endpoint name is unique, let's create a random suffix to append to it. \n", "\n", "> In general, you won't need to use this technique but you will use more meaningful names. Please skip the following cell if your case:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "name_endpoint" }, "outputs": [], "source": [ "endpoint_name = \"heart-classifier\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import random\n", "import string\n", "\n", "# Creating a unique endpoint name by including a random suffix\n", "allowed_chars = string.ascii_lowercase + string.digits\n", "endpoint_suffix = \"\".join(random.choice(allowed_chars) for x in range(5))\n", "endpoint_name = f\"{endpoint_name}-{endpoint_suffix}\"\n", "\n", "print(f\"Endpoint name: {endpoint_name}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Let's configure the endpoint:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "configure_endpoint" }, "outputs": [], "source": [ "endpoint = BatchEndpoint(\n", " name=endpoint_name,\n", " description=\"A heart condition classifier for batch inference\",\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2 Create the endpoint\n", "Using the `MLClient` created earlier, we will now create the Endpoint in the workspace. This command will start the endpoint creation and return a confirmation response while the endpoint creation continues." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create_endpoint", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "ml_client.batch_endpoints.begin_create_or_update(endpoint).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Create a batch deployment\n", "\n", "A deployment is a set of resources required for hosting the model that does the actual inferencing. We will create a deployment for our endpoint using the `BatchDeployment` class.\n", "\n", "### 4.1 Creating an scoring script to work with the model\n", "\n", "> MLflow models don't require an scoring script." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### 4.2 Creating the compute\n", "\n", "Batch deployments can run on any Azure ML compute that already exists in the workspace. That means that multiple batch deployments can share the same compute infrastructure. In this example, we are going to work on an AzureML compute cluster called `cpu-cluster`. Let's verify the compute exists on the workspace or create it otherwise." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create_compute", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "compute_name = \"batch-cluster\"\n", "if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):\n", " compute_cluster = AmlCompute(\n", " name=compute_name, description=\"amlcompute\", min_instances=0, max_instances=5\n", " )\n", " ml_client.begin_create_or_update(compute_cluster).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 4.3 Creating the environment\n", "\n", "> MLflow models don't require an environment." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### 4.4 Configuring the deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "configure_deployment", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "deployment = ModelBatchDeployment(\n", " name=\"classifier-xgboost\",\n", " description=\"A heart condition classifier based on XGBoost\",\n", " endpoint_name=endpoint.name,\n", " model=model,\n", " compute=compute_name,\n", " settings=ModelBatchDeploymentSettings(\n", " instance_count=2,\n", " max_concurrency_per_instance=2,\n", " mini_batch_size=10,\n", " output_action=BatchDeploymentOutputAction.APPEND_ROW,\n", " output_file_name=\"predictions.csv\",\n", " retry_settings=BatchRetrySettings(max_retries=3, timeout=300),\n", " logging_level=\"info\",\n", " ),\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 4.5 Create the deployment\n", "Using the `MLClient` created earlier, we will now create the deployment in the workspace. This command will start the deployment creation and return a confirmation response while the deployment creation continues." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create_deployment", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "ml_client.batch_deployments.begin_create_or_update(deployment).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Once created, let's configure this new deployment as the default one:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "set_default_deployment", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "endpoint = ml_client.batch_endpoints.get(endpoint.name)\n", "endpoint.defaults.deployment_name = deployment.name\n", "ml_client.batch_endpoints.begin_create_or_update(endpoint).result()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "print(f\"The default deployment is {endpoint.defaults.deployment_name}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### 4.6 Testing the deployment\n", "\n", "Once the deployment is created, it is ready to recieve jobs.\n", "\n", "#### 4.6.1 Creating a data asset\n", "\n", "Let's first register a data asset so we can run the job against it. This data asset is a folder containing 1000 images from the original ImageNet dataset. We are going to download it first and then create the data asset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "configure_data_asset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "data_path = \"data\"\n", "dataset_name = \"heart-dataset-unlabeled\"\n", "\n", "heart_dataset_unlabeled = Data(\n", " path=data_path,\n", " type=AssetTypes.URI_FOLDER,\n", " description=\"An unlabeled dataset for heart classification\",\n", " name=dataset_name,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "create_data_asset" }, "outputs": [], "source": [ "ml_client.data.create_or_update(heart_dataset_unlabeled)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Let's get a reference of the new data asset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "get_data_asset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "heart_dataset_unlabeled = ml_client.data.get(name=dataset_name, label=\"latest\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### 4.6.2 Creating an input for the deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "configure_inputs", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "input = Input(type=AssetTypes.URI_FOLDER, path=heart_dataset_unlabeled.id)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### 4.6.3 Invoke the deployment\n", "\n", "Using the `MLClient` created earlier, we will get a handle to the endpoint. The endpoint can be invoked using the `invoke` command with the following parameters:\n", "- `name` - Name of the endpoint\n", "- `input_path` - Path where input data is present" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "start_batch_scoring_job", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "job = ml_client.batch_endpoints.invoke(endpoint_name=endpoint.name, input=input)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Notice how we are not indicating the deployment name in the invoke operation. That's because the endpoint automatically routes the job to the default deployment. Since our endpoint only has one deployment, then that one is the default one. You can target an specific deployment by indicating the argument/parameter `deployment_name`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "job = ml_client.batch_endpoints.invoke(\n", " deployment_name=deployment.name, endpoint_name=endpoint.name, input=input\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### 4.6.4 Get the details of the invoked job\n", "\n", "Let us get details and logs of the invoked job:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "get_job", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "ml_client.jobs.get(job.name)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can wait for the job to finish using the following code:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "stream_job_logs" }, "outputs": [], "source": [ "ml_client.jobs.stream(job.name)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 4.7 Exploring the results\n", "\n", "The deployment creates a child job that executes the scoring. We can get the details of it using the following code:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Job name:\", scoring_job.name)\n", "print(\"Job status:\", scoring_job.status)\n", "print(\n", " \"Job duration:\",\n", " scoring_job.creation_context.last_modified_at\n", " - scoring_job.creation_context.created_at,\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### 4.7.1 Download the results\n", "\n", "The outputs generated by the deployment job will be placed in an output named `score`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "download_outputs" }, "outputs": [], "source": [ "ml_client.jobs.download(name=scoring_job.name, download_path=\".\", output_name=\"score\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can read this CSV file using the `pandas` library:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "read_outputs", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import pandas as pd\n", "\n", "score = pd.read_csv(\n", " \"named-outputs/score/predictions.csv\", names=[\"row\", \"prediction\", \"file\"]\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 5. Customize deployment with an scoring script\n", "\n", "You can still use an scoring script with MLflow models in batch endpoints:" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 5.1 Create an scoring script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile code/batch_driver.py\n", "\n", "import os\n", "import glob\n", "import mlflow\n", "import pandas as pd\n", "\n", "\n", "def init():\n", " global model\n", " global model_input_types\n", " global model_output_names\n", "\n", " # AZUREML_MODEL_DIR is an environment variable created during deployment\n", " # It is the path to the model folder\n", " # Please provide your model's folder name if there's one\n", " model_path = glob.glob(os.environ[\"AZUREML_MODEL_DIR\"] + \"/*/\")[0]\n", "\n", " # Load the model, it's input types and output names\n", " model = mlflow.pyfunc.load(model_path)\n", " if model.metadata.signature.inputs:\n", " model_input_types = dict(\n", " zip(\n", " model.metadata.signature.inputs.input_names(),\n", " model.metadata.signature.inputs.pandas_types(),\n", " )\n", " )\n", " if model.metadata.signature.outputs:\n", " if model.metadata.signature.outputs.has_input_names():\n", " model_output_names = model.metadata.signature.outputs.input_names()\n", " elif len(model.metadata.signature.outputs.input_names()) == 1:\n", " model_output_names = [\"prediction\"]\n", "\n", "\n", "def run(mini_batch):\n", " print(f\"run method start: {__file__}, run({len(mini_batch)} files)\")\n", "\n", " data = pd.concat(\n", " map(\n", " lambda fp: pd.read_csv(fp).assign(filename=os.path.basename(fp)), mini_batch\n", " )\n", " )\n", " if model_input_types:\n", " data = data.astype(model_input_types)\n", "\n", " pred = model.predict(data)\n", "\n", " if pred is not pd.DataFrame:\n", " if not model_output_names:\n", " model_output_names = [\"pred_col\" + str(i) for i in range(pred.shape[1])]\n", " pred = pd.DataFrame(pred, columns=model_output_names)\n", "\n", " return pd.concat([data, pred], axis=1)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 5.2 Indicate the environment:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "configure_environment_custom" }, "outputs": [], "source": [ "environment = Environment(\n", " name=\"batch-mlflow-xgboost\",\n", " conda_file=\"environment/conda.yaml\",\n", " image=\"mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest\",\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 5.3 Configure the deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "configure_deployment_custom" }, "outputs": [], "source": [ "deployment = ModelBatchDeployment(\n", " name=\"classifier-xgboost-custom\",\n", " description=\"A heart condition classifier based on XGBoost with a custom scoring script\",\n", " endpoint_name=endpoint.name,\n", " model=model,\n", " environment=environment,\n", " code_configuration=CodeConfiguration(code=\"code\", scoring_script=\"batch_driver.py\"),\n", " compute=compute_name,\n", " settings=ModelBatchDeploymentSettings(\n", " instance_count=2,\n", " max_concurrency_per_instance=2,\n", " mini_batch_size=10,\n", " output_action=BatchDeploymentOutputAction.APPEND_ROW,\n", " output_file_name=\"predictions.csv\",\n", " retry_settings=BatchRetrySettings(max_retries=3, timeout=300),\n", " logging_level=\"info\",\n", " ),\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 5.3 Create the deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "create_deployment_custom" }, "outputs": [], "source": [ "ml_client.batch_deployments.begin_create_or_update(deployment).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Your deployment is ready to use." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 6. Clean up resources\n", "\n", "Clean-up the resources created. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "name": "delete_endpoint" }, "outputs": [], "source": [ "ml_client.batch_endpoints.begin_delete(endpoint_name).result()" ] } ], "metadata": { "kernel_info": { "name": "python310-sdkv2" }, "kernelspec": { "display_name": "env", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.11" }, "microsoft": { "ms_spell_check": { "ms_spell_check_language": "en" } }, "nteract": { "version": "nteract-front-end@1.0.0" } }, "nbformat": 4, "nbformat_minor": 0 }