sdk/python/foundation-models/system/inference/image-text-embeddings/image-text-embeddings-batch-endpoint.ipynb (738 lines of code) (raw):

{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Image and Text Embeddings Inference using Batch Endpoints\n", "\n", "This sample shows how to deploy `embeddings` type models to a batch endpoint for image and text embeddings inference.\n", "\n", "### Task\n", "`embeddings` takes in images and/or text samples. For each image and text sample, feature embeddings are returned from the model.\n", " \n", "### Model\n", "Models that can perform the `embeddings` task are tagged with `embeddings`. We will use the `OpenAI-CLIP-Image-Text-Embeddings-vit-base-patch32` model in this notebook. If you opened this notebook from a specific model card, remember to replace the specific model name. If you don't find a model that suits your scenario or domain, you can discover and [import models from HuggingFace hub](../../import/import_model_into_registry.ipynb) and then use them for inference. \n", "\n", "### Inference data\n", "We will use the [fridgeObjects](https://automlsamplenotebookdata-adcuc7f7bqhhh8a4.b02.azurefd.net/image-classification/fridgeObjects.zip) dataset.\n", "\n", "\n", "### Outline\n", "1. Setup pre-requisites\n", "2. Pick a model to deploy\n", "3. Prepare data for inference - Using a folder of CSV files\n", "4. Deploy the model to a batch endpoint\n", "5. Test the endpoint - Using a folder of CSV files\n", "6. Clean up resources - delete the endpoint" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Setup pre-requisites\n", "* Install dependencies\n", "* Connect to AzureML Workspace. Learn more at [set up SDK authentication](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication?tabs=sdk). Replace `<WORKSPACE_NAME>`, `<RESOURCE_GROUP>` and `<SUBSCRIPTION_ID>` below.\n", "* Connect to `azureml` system registry" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azure.ai.ml import MLClient, Input\n", "from azure.ai.ml.entities import AmlCompute\n", "from azure.ai.ml.constants import AssetTypes\n", "from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n", "import time\n", "\n", "try:\n", " credential = DefaultAzureCredential()\n", " credential.get_token(\"https://management.azure.com/.default\")\n", "except Exception as ex:\n", " credential = InteractiveBrowserCredential()\n", "\n", "try:\n", " workspace_ml_client = MLClient.from_config(credential)\n", " subscription_id = workspace_ml_client.subscription_id\n", " resource_group = workspace_ml_client.resource_group_name\n", " workspace_name = workspace_ml_client.workspace_name\n", "except Exception as ex:\n", " print(ex)\n", " # Enter details of your AML workspace\n", " subscription_id = \"<SUBSCRIPTION_ID>\"\n", " resource_group = \"<RESOURCE_GROUP>\"\n", " workspace_name = \"<AML_WORKSPACE_NAME>\"\n", "\n", "workspace_ml_client = MLClient(\n", " credential, subscription_id, resource_group, workspace_name\n", ")\n", "\n", "# The models are available in the AzureML system registry, \"azureml\"\n", "registry_ml_client = MLClient(\n", " credential,\n", " subscription_id,\n", " resource_group,\n", " registry_name=\"azureml\",\n", ")\n", "# Generating a unique timestamp that can be used for names and versions that need to be unique\n", "timestamp = str(int(time.time()))" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Create a compute cluster\n", "Use the model card from the AzureML system registry to check the minimum required inferencing SKU, referenced as size below. If you already have a sufficient compute cluster, you can simply define the name in compute_name in the following code block." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from azure.ai.ml.entities import AmlCompute\n", "from azure.core.exceptions import ResourceNotFoundError\n", "\n", "compute_name = \"cpu-cluster\"\n", "\n", "try:\n", " _ = workspace_ml_client.compute.get(compute_name)\n", " print(\"Found existing compute target.\")\n", "except ResourceNotFoundError:\n", " print(\"Creating a new compute target...\")\n", " compute_config = AmlCompute(\n", " name=compute_name,\n", " description=\"An AML compute cluster\",\n", " size=\"Standard_DS3_V2\",\n", " min_instances=0,\n", " max_instances=3,\n", " idle_time_before_scale_down=120,\n", " )\n", " workspace_ml_client.begin_create_or_update(compute_config).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Pick a model to deploy\n", "\n", "Browse models in the Model Catalog in the AzureML Studio, filtering by the `embeddings` task. In this example, we use the `OpenAI-CLIP-Image-Text-Embeddings-vit-base-patch32` model. If you have opened this notebook for a different model, replace the model name accordingly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_name = \"OpenAI-CLIP-Image-Text-Embeddings-vit-base-patch32\"\n", "\n", "foundation_model = registry_ml_client.models.get(name=model_name, label=\"latest\")\n", "print(\n", " f\"\\n\\nUsing model name: {foundation_model.name}, version: {foundation_model.version}, id: {foundation_model.id} for inferencing\"\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Prepare data for inference - Using a folder of csv files\n", "\n", "We will use the [fridgeObjects](https://automlsamplenotebookdata-adcuc7f7bqhhh8a4.b02.azurefd.net/image-classification/fridgeObjects.zip) dataset for multi-class classification. The fridge object dataset is stored in a directory. There are four different folders inside:\n", "- /water_bottle\n", "- /milk_bottle\n", "- /carton\n", "- /can\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import urllib\n", "import shutil\n", "from zipfile import ZipFile\n", "\n", "# Change to a different location if you prefer\n", "dataset_parent_dir = \"./data\"\n", "\n", "# create data folder if it doesnt exist.\n", "os.makedirs(dataset_parent_dir, exist_ok=True)\n", "\n", "# Download data\n", "download_url = \"https://automlsamplenotebookdata-adcuc7f7bqhhh8a4.b02.azurefd.net/image-classification/fridgeObjects.zip\"\n", "\n", "# Extract current dataset name from dataset url\n", "dataset_name = os.path.split(download_url)[-1].split(\".\")[0]\n", "# Get dataset path for later use\n", "dataset_dir = os.path.join(dataset_parent_dir, dataset_name)\n", "\n", "if os.path.exists(dataset_dir):\n", " shutil.rmtree(dataset_dir)\n", "\n", "# Get the data zip file path\n", "data_file = os.path.join(dataset_parent_dir, f\"{dataset_name}.zip\")\n", "\n", "# Download the dataset\n", "urllib.request.urlretrieve(download_url, filename=data_file)\n", "\n", "# Extract files\n", "with ZipFile(data_file, \"r\") as zip:\n", " print(\"extracting files...\")\n", " zip.extractall(path=dataset_parent_dir)\n", " print(\"done\")\n", "\n", "# Delete zip file\n", "os.remove(data_file)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### 3.1 Prepare image data for batch inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import base64\n", "\n", "\n", "def read_image(image_path):\n", " with open(image_path, \"rb\") as f:\n", " data = f.read()\n", " data = base64.encodebytes(data).decode(\"utf-8\")\n", " return data\n", "\n", "\n", "image_list = []\n", "image_path_list = []\n", "\n", "for dir_name in os.listdir(dataset_dir):\n", " dir_path = os.path.join(dataset_dir, dir_name)\n", " for path, _, files in os.walk(dir_path):\n", " for file in files:\n", " image_path = os.path.join(path, file)\n", " image = read_image(image_path)\n", " image_path_list.append(image_path)\n", " image_list.append(image)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### 3.2 Prepare a folder of CSV files with base64 images for batch inference input\n", "\n", "We can provide input images to batch inference in a csv file containing a column named \"image\" having either images in base64 format or publicly accessible image URLs. We provide empty string in the same csv file in the column named \"text\".\n", "\n", "The deployment in the `Create batch deployment` section below takes the argument `mini_batch_size`, which is the number of CSV files processed by the model in a single mini_batch. To limit the number of images processed in each mini_batch we split the dataset into multiple csv files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import pandas as pd\n", "from pathlib import Path\n", "\n", "image_csv_folder_path = os.path.join(dataset_parent_dir, \"batch_image\")\n", "os.makedirs(image_csv_folder_path, exist_ok=True)\n", "batch_input_file = \"batch_input.csv\"\n", "\n", "data = [[image, \"\"] for image in image_list]\n", "batch_df = pd.DataFrame(data, columns=[\"image\", \"text\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Divide this into files of 10 rows each\n", "batch_size_per_predict = 10\n", "for i in range(0, len(batch_df), batch_size_per_predict):\n", " j = i + batch_size_per_predict\n", " batch_df[i:j].to_csv(os.path.join(image_csv_folder_path, str(i) + batch_input_file))\n", "\n", "# Check out the first and last file name created\n", "input_paths = sorted(Path(image_csv_folder_path).iterdir(), key=os.path.getmtime)\n", "input_files = [os.path.basename(path) for path in input_paths]\n", "print(f\"{input_files[0]} to {str(i)}{batch_input_file}.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import Image\n", "\n", "sample_image = os.path.join(dataset_dir, \"milk_bottle\", \"99.jpg\")\n", "Image(filename=sample_image)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 3.3 Prepare a folder of CSV files with text samples for batch inference input\n", "\n", "We can provide text samples to batch inference in a csv file containing a column named \"text\". We provide empty strings in the same csv file in the column named \"image\".\n", "\n", "The deployment in the `Create batch deployment` section below takes the argument `mini_batch_size`, which is the number of CSV files processed by the model in a single mini_batch. To limit the number of images processed in each mini_batch we split the dataset into multiple csv files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import pandas as pd\n", "from pathlib import Path\n", "import random\n", "import string\n", "\n", "text_csv_folder_path = os.path.join(dataset_parent_dir, \"batch_text\")\n", "os.makedirs(text_csv_folder_path, exist_ok=True)\n", "batch_input_file = \"batch_input.csv\"\n", "\n", "data = [\n", " [\"\", \"a photo of a \" + os.path.basename(os.path.dirname(image_path))]\n", " for image_path in image_path_list\n", "]\n", "batch_df = pd.DataFrame(data, columns=[\"image\", \"text\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Divide this into files of 10 rows each\n", "batch_size_per_predict = 10\n", "for i in range(0, len(batch_df), batch_size_per_predict):\n", " j = i + batch_size_per_predict\n", " batch_df[i:j].to_csv(os.path.join(text_csv_folder_path, str(i) + batch_input_file))\n", "\n", "# Check out the first and last file name created\n", "input_paths = sorted(Path(text_csv_folder_path).iterdir(), key=os.path.getmtime)\n", "input_files = [os.path.basename(path) for path in input_paths]\n", "print(f\"{input_files[0]} to {str(i)}{batch_input_file}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 3.4 Prepare a folder of CSV files with with base64 images and text samples for batch inference input\n", "\n", "We can provide input images to batch inference in a csv file containing a column named \"image\" having either images in base64 format or publicly accessible image URLs. We provide text sample strings in the same csv file in the column named \"text\".\n", "\n", "The deployment in the `Create batch deployment` section below takes the argument `mini_batch_size`, which is the number of CSV files processed by the model in a single mini_batch. To limit the number of images processed in each mini_batch we split the dataset into multiple csv files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import base64\n", "import pandas as pd\n", "from pathlib import Path\n", "import random\n", "import string\n", "\n", "image_text_csv_folder_path = os.path.join(dataset_parent_dir, \"batch_image_text\")\n", "os.makedirs(image_text_csv_folder_path, exist_ok=True)\n", "batch_input_file = \"batch_input.csv\"\n", "\n", "data = [\n", " [image_list[i], \"a photo of a \" + os.path.basename(os.path.dirname(image_path))]\n", " for i in range(len(image_list))\n", "]\n", "batch_df = pd.DataFrame(data, columns=[\"image\", \"text\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Divide this into files of 10 rows each\n", "batch_size_per_predict = 10\n", "for i in range(0, len(batch_df), batch_size_per_predict):\n", " j = i + batch_size_per_predict\n", " batch_df[i:j].to_csv(\n", " os.path.join(image_text_csv_folder_path, str(i) + batch_input_file)\n", " )\n", "\n", "# Check out the first and last file name created\n", "input_paths = sorted(Path(image_text_csv_folder_path).iterdir(), key=os.path.getmtime)\n", "input_files = [os.path.basename(path) for path in input_paths]\n", "print(f\"{input_files[0]} to {str(i)}{batch_input_file}.\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Deploy the model to a batch endpoint\n", "Batch endpoints are endpoints that are used to do batch inferencing on large volumes of data over a period of time. The endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis. For more information on batch endpoints and deployments see [What are batch endpoints?](https://learn.microsoft.com/en-us/azure/machine-learning/concept-endpoints?view=azureml-api-2#what-are-batch-endpoints).\n", "\n", "* Create a batch endpoint.\n", "* Create a batch deployment.\n", "* Set the deployment as default; doing so allows invoking the endpoint without specifying the deployment's name." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Create a batch endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time, sys\n", "from azure.ai.ml.entities import (\n", " BatchEndpoint,\n", " BatchDeployment,\n", " BatchRetrySettings,\n", " AmlCompute,\n", ")\n", "\n", "# Endpoint names need to be unique in a region, hence using timestamp to create unique endpoint name\n", "endpoint_name = \"clip-embeddings-batch\" + str(timestamp)\n", "# Create a batch endpoint\n", "endpoint = BatchEndpoint(\n", " name=endpoint_name,\n", " description=\"Batch endpoint for \"\n", " + foundation_model.name\n", " + \", for image-text-embeddings task\",\n", ")\n", "workspace_ml_client.begin_create_or_update(endpoint).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Create a batch deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "deployment_name = \"embed-clip-vit-base-patch32\"\n", "\n", "deployment = BatchDeployment(\n", " name=deployment_name,\n", " endpoint_name=endpoint_name,\n", " model=foundation_model.id,\n", " compute=compute_name,\n", " error_threshold=0,\n", " instance_count=1,\n", " logging_level=\"info\",\n", " max_concurrency_per_instance=1,\n", " mini_batch_size=2,\n", " output_file_name=\"predictions.csv\",\n", " retry_settings=BatchRetrySettings(max_retries=3, timeout=600),\n", ")\n", "workspace_ml_client.begin_create_or_update(deployment).result()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Set the deployment as default" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint = workspace_ml_client.batch_endpoints.get(endpoint_name)\n", "endpoint.defaults.deployment_name = deployment_name\n", "workspace_ml_client.begin_create_or_update(endpoint).result()\n", "\n", "print(f\"The default deployment is {endpoint.defaults.deployment_name}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.1 Test the endpoint - Using a folder of CSV files with base64 images\n", "\n", "Invoke the batch endpoint with the input parameter pointing to the folder of CSV files containing the batch inference input. This creates a pipeline job using the default deployment in the endpoint. Wait for the job to complete.\n", "\n", "Note: If job failed with Out of Memory Error then please try splitting your input into smaller csv files or decreasing `mini_batch_size` for the deployment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job = None\n", "input = Input(path=image_csv_folder_path, type=AssetTypes.URI_FOLDER)\n", "num_retries = 3\n", "for i in range(num_retries):\n", " try:\n", " job = workspace_ml_client.batch_endpoints.invoke(\n", " endpoint_name=endpoint.name, input=input\n", " )\n", " break\n", " except Exception as e:\n", " if i == num_retries - 1:\n", " raise e\n", " else:\n", " print(\"Endpoint invocation failed. Retrying after 5 seconds...\")\n", " time.sleep(5)\n", "if job is not None:\n", " workspace_ml_client.jobs.stream(job.name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scoring_job = list(workspace_ml_client.jobs.list(parent_job_name=job.name))[0]\n", "\n", "workspace_ml_client.jobs.download(\n", " name=scoring_job.name,\n", " download_path=os.path.join(dataset_parent_dir, \"csv-output\"),\n", " output_name=\"score\",\n", ")\n", "\n", "predictions_file = os.path.join(\n", " dataset_parent_dir, \"csv-output\", \"named-outputs\", \"score\", \"predictions.csv\"\n", ")\n", "\n", "# Load the batch predictions file with no headers into a dataframe and set your column names\n", "score_df = pd.read_csv(\n", " predictions_file,\n", " header=None,\n", " names=[\"row_number_per_file\", \"image_features\", \"file_name\"],\n", ")\n", "score_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.2 Test the endpoint - Using a folder of CSV files with text samples\n", "\n", "Invoke the batch endpoint with the input parameter pointing to the folder of CSV files containing the batch inference input. This creates a pipeline job using the default deployment in the endpoint. Wait for the job to complete.\n", "\n", "Note: If job failed with Out of Memory Error then please try splitting your input into smaller csv files or decreasing `mini_batch_size` for the deployment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job = None\n", "input = Input(path=text_csv_folder_path, type=AssetTypes.URI_FOLDER)\n", "num_retries = 3\n", "for i in range(num_retries):\n", " try:\n", " job = workspace_ml_client.batch_endpoints.invoke(\n", " endpoint_name=endpoint.name, input=input\n", " )\n", " break\n", " except Exception as e:\n", " if i == num_retries - 1:\n", " raise e\n", " else:\n", " print(\"Endpoint invocation failed. Retrying after 5 seconds...\")\n", " time.sleep(5)\n", "if job is not None:\n", " workspace_ml_client.jobs.stream(job.name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scoring_job = list(workspace_ml_client.jobs.list(parent_job_name=job.name))[0]\n", "\n", "workspace_ml_client.jobs.download(\n", " name=scoring_job.name,\n", " download_path=os.path.join(dataset_parent_dir, \"csv-output\"),\n", " output_name=\"score\",\n", ")\n", "\n", "predictions_file = os.path.join(\n", " dataset_parent_dir, \"csv-output\", \"named-outputs\", \"score\", \"predictions.csv\"\n", ")\n", "\n", "# Load the batch predictions file with no headers into a dataframe and set your column names\n", "score_df = pd.read_csv(\n", " predictions_file,\n", " header=None,\n", " names=[\"row_number_per_file\", \"text_features\", \"file_name\"],\n", ")\n", "score_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.3 Test the endpoint - Using a folder of CSV files with base64 images and text samples\n", "\n", "Invoke the batch endpoint with the input parameter pointing to the folder of CSV files containing the batch inference input. This creates a pipeline job using the default deployment in the endpoint. Wait for the job to complete.\n", "\n", "Note: If job failed with Out of Memory Error then please try splitting your input into smaller csv files or decreasing `mini_batch_size` for the deployment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job = None\n", "input = Input(path=image_text_csv_folder_path, type=AssetTypes.URI_FOLDER)\n", "num_retries = 3\n", "for i in range(num_retries):\n", " try:\n", " job = workspace_ml_client.batch_endpoints.invoke(\n", " endpoint_name=endpoint.name, input=input\n", " )\n", " break\n", " except Exception as e:\n", " if i == num_retries - 1:\n", " raise e\n", " else:\n", " print(\"Endpoint invocation failed. Retrying after 5 seconds...\")\n", " time.sleep(5)\n", "if job is not None:\n", " workspace_ml_client.jobs.stream(job.name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scoring_job = list(workspace_ml_client.jobs.list(parent_job_name=job.name))[0]\n", "\n", "workspace_ml_client.jobs.download(\n", " name=scoring_job.name,\n", " download_path=os.path.join(dataset_parent_dir, \"csv-output\"),\n", " output_name=\"score\",\n", ")\n", "\n", "predictions_file = os.path.join(\n", " dataset_parent_dir, \"csv-output\", \"named-outputs\", \"score\", \"predictions.csv\"\n", ")\n", "\n", "# Load the batch predictions file with no headers into a dataframe and set your column names\n", "score_df = pd.read_csv(\n", " predictions_file,\n", " header=None,\n", " names=[\"row_number_per_file\", \"image_features\", \"text_features\", \"file_name\"],\n", ")\n", "score_df.head()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 6. Clean up resources - delete the endpoint\n", "Batch endpoints use compute resources only when jobs are submitted. You can keep the batch endpoint for your reference without worrying about compute bills, or choose to delete the endpoint. If you created your compute cluster to have zero minimum instances and scale down soon after being idle, you won't be charged for an unused compute." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workspace_ml_client.batch_endpoints.begin_delete(name=endpoint_name).result()" ] } ], "metadata": { "kernelspec": { "display_name": "rc_133", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 2 }