sdk/python/foundation-models/system/inference/text-to-image/image-text-to-image-batch-endpoint.ipynb (491 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Image to Image Inference using Online Endpoints\n",
"\n",
"This sample shows how to deploy `image to image` type stable diffusion models to a batch endpoint for inference.\n",
"\n",
"### Task\n",
"`image to image` task takes an original image and a text prompt as input. The model generates an image by modifying the original image.\n",
"\n",
" \n",
"### Model\n",
"Models that can perform the `image to image` task are tagged with `image-to-image`. We will use the `stabilityai-stable-diffusion-xl-refiner-1-0` model in this notebook. If you opened this notebook from a specific model card, remember to replace the specific model name.\n",
"\n",
"\n",
"### Outline\n",
"1. Setup pre-requisites\n",
"2. Pick a model to deploy\n",
"3. Prepare data for inference - using a folder of csv files with prompt, image columns\n",
"4. Deploy the model to a batch endpoint\n",
"5. Test the endpoint - using csv files\n",
"6. Clean up resources - delete the endpoint"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1. Setup pre-requisites\n",
"* Install dependencies\n",
"* Connect to AzureML Workspace. Learn more at [set up SDK authentication](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication?tabs=sdk). Replace `<WORKSPACE_NAME>`, `<RESOURCE_GROUP>` and `<SUBSCRIPTION_ID>` below.\n",
"* Connect to `azureml` system registry"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azure.ai.ml import MLClient, Input\n",
"from azure.ai.ml.entities import AmlCompute\n",
"from azure.ai.ml.constants import AssetTypes\n",
"from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n",
"import time\n",
"\n",
"try:\n",
" credential = DefaultAzureCredential()\n",
" credential.get_token(\"https://management.azure.com/.default\")\n",
"except Exception as ex:\n",
" credential = InteractiveBrowserCredential()\n",
"\n",
"try:\n",
" workspace_ml_client = MLClient.from_config(credential)\n",
" subscription_id = workspace_ml_client.subscription_id\n",
" resource_group = workspace_ml_client.resource_group_name\n",
" workspace_name = workspace_ml_client.workspace_name\n",
"except Exception as ex:\n",
" print(ex)\n",
" # Enter details of your AML workspace\n",
" subscription_id = \"<SUBSCRIPTION_ID>\"\n",
" resource_group = \"<RESOURCE_GROUP>\"\n",
" workspace_name = \"<AML_WORKSPACE_NAME>\"\n",
"\n",
"workspace_ml_client = MLClient(\n",
" credential, subscription_id, resource_group, workspace_name\n",
")\n",
"\n",
"# The models, fine tuning pipelines and environments are available in the AzureML system registry, \"azureml\"\n",
"registry_name = \"azureml\"\n",
"registry_ml_client = MLClient(\n",
" credential,\n",
" subscription_id,\n",
" resource_group,\n",
" registry_name=registry_name,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create a compute cluster\n",
"Use the model card from the AzureML system registry to check the minimum required inferencing SKU, referenced as size below. If you already have a sufficient compute cluster that you wish to use, you can simply define the name in `compute_name` in the following code block. Otherwise, the below snippet will create a new compute cluster."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from azure.ai.ml.entities import AmlCompute\n",
"from azure.core.exceptions import ResourceNotFoundError\n",
"\n",
"compute_name = \"gpu-cluster\"\n",
"\n",
"try:\n",
" _ = workspace_ml_client.compute.get(compute_name)\n",
" print(\"Found existing compute target.\")\n",
"except ResourceNotFoundError:\n",
" print(\"Creating a new compute target...\")\n",
" compute_config = AmlCompute(\n",
" name=compute_name,\n",
" description=\"An AML compute cluster\",\n",
" size=\"Standard_NC6s_v3\",\n",
" min_instances=0,\n",
" max_instances=3,\n",
" idle_time_before_scale_down=120,\n",
" )\n",
" workspace_ml_client.begin_create_or_update(compute_config).result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2. Pick a model to deploy\n",
"\n",
"Browse models in the Model Catalog in the AzureML Studio, filtering by the `image-to-image` task. In this example, we use the `stabilityai-stable-diffusion-xl-refiner-1-0` model. If you have opened this notebook for a different model, replace the model name accordingly. This is a pre-trained model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Name of the image to image model to be deployed\n",
"model_name = \"stabilityai-stable-diffusion-xl-refiner-1-0\"\n",
"\n",
"try:\n",
" model = registry_ml_client.models.get(name=model_name, label=\"latest\")\n",
"except Exception as ex:\n",
" print(\n",
" f\"No model named {model_name} found in registry. \"\n",
" \"Please check model name present in Azure model catalog\"\n",
" )\n",
" raise ex\n",
"\n",
"print(\n",
" f\"\\n\\nUsing model name: {model.name}, version: {model.version}, id: {model.id} for generating images from text.\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Prepare data for inference - using a folder of csv files with prompt and image columns\n",
"\n",
"The CSV files should consist of 2 columns namely, \n",
"* `image`: Original image to be used as base image for image to image generation. It should either be in base64 format or publicly accessible URL.\n",
"* `prompt`: A text prompt.\n",
"\n",
"We provide the text prompts in a csv file starting from the first row of a column named \"prompt\". We provide the input image as base64 string starting from the first row of the column \"image\".\n",
"\n",
"The deployment in the \"Create batch deployment\" section below takes the argument `mini_batch_size`, which is the number of CSV files processed by the model in a single mini_batch. To limit the number of prompts processed in each mini_batch we split the dataset into multiple csv files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Prepare input csv file\n",
"\n",
"import base64\n",
"import os\n",
"import pandas as pd\n",
"\n",
"\n",
"def read_image(image_path: str) -> bytes:\n",
" \"\"\"Reads an image from a file path into a byte array.\"\"\"\n",
" with open(image_path, \"rb\") as f:\n",
" return f.read()\n",
"\n",
"\n",
"base_image1 = \"inpainting_data/images/dog_on_bench.png\"\n",
"base_image2 = \"inpainting_data/images/teapot.png\"\n",
"\n",
"dataset_parent_dir = \"inpainting_data/batch_data\"\n",
"os.makedirs(dataset_parent_dir, exist_ok=True)\n",
"\n",
"input_data = {\n",
" \"columns\": [\"image\", \"prompt\"],\n",
" \"data\": [\n",
" {\n",
" \"image\": base64.encodebytes(read_image(base_image1)).decode(\"utf-8\"),\n",
" \"prompt\": \"A yellow cat, high resolution, sitting on a park bench\",\n",
" },\n",
" {\n",
" \"image\": base64.encodebytes(read_image(base_image2)).decode(\"utf-8\"),\n",
" \"prompt\": \"A small flower featuring a blend of pink and purple colors.\",\n",
" },\n",
" ],\n",
"}\n",
"pd.DataFrame(**input_data).to_csv(\n",
" os.path.join(dataset_parent_dir, \"input1.csv\"), index=False\n",
")\n",
"\n",
"input_data = {\n",
" \"columns\": [\"image\", \"prompt\"],\n",
" \"data\": [\n",
" {\n",
" \"image\": base64.encodebytes(read_image(base_image1)).decode(\"utf-8\"),\n",
" \"prompt\": \"Pikachu, cinematic, digital art, sitting on bench\",\n",
" },\n",
" {\n",
" \"image\": base64.encodebytes(read_image(base_image2)).decode(\"utf-8\"),\n",
" \"prompt\": \"A woman with red hair in the style of Tamara de Lempicka.\",\n",
" },\n",
" ],\n",
"}\n",
"pd.DataFrame(**input_data).to_csv(\n",
" os.path.join(dataset_parent_dir, \"input2.csv\"), index=False\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Read all the csvs in the data folder into a pandas dataframe\n",
"import glob\n",
"import os\n",
"import pandas as pd\n",
"\n",
"# Specify the folder where your CSV files are located\n",
"dataset_parent_dir = \"inpainting_data/batch_data\"\n",
"\n",
"# Use glob to get a list of CSV files in the folder\n",
"csv_files = glob.glob(os.path.join(dataset_parent_dir, \"*.csv\"))\n",
"\n",
"# Read all CSV files into a single DataFrame using pd.concat\n",
"batch_df = pd.concat((pd.read_csv(file) for file in csv_files), ignore_index=True)\n",
"\n",
"# Now, 'batch_df' contains all the data from the CSV files in the folder\n",
"print(batch_df.head())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"# Specify the folder where your CSV files should be saved\n",
"processed_dataset_parent_dir = \"inpainting_data/processed_batch_data\"\n",
"os.makedirs(processed_dataset_parent_dir, exist_ok=True)\n",
"batch_input_file = \"batch_input.csv\"\n",
"\n",
"# Divide this into files of <x> rows each\n",
"batch_size_per_predict = 2\n",
"for i in range(0, len(batch_df), batch_size_per_predict):\n",
" j = i + batch_size_per_predict\n",
" batch_df[i:j].to_csv(\n",
" os.path.join(processed_dataset_parent_dir, str(i) + batch_input_file)\n",
" )\n",
"\n",
"# Check out the first and last file name created\n",
"input_paths = sorted(Path(processed_dataset_parent_dir).iterdir(), key=os.path.getmtime)\n",
"input_files = [os.path.basename(path) for path in input_paths]\n",
"print(f\"{input_files[0]} to {str(i)}{batch_input_file}.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Register folder containing csv files in AML as data asset to use in batch job."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"input = Input(path=processed_dataset_parent_dir, type=AssetTypes.URI_FOLDER)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Deploy the model to a batch endpoint\n",
"Batch endpoints are endpoints that are used to do batch inferencing on large volumes of data over a period of time. The endpoints receive pointers to data and run jobs asynchronously to process the data in parallel on compute clusters. Batch endpoints store outputs to a data store for further analysis. For more information on batch endpoints and deployments, see <a href=\"https://learn.microsoft.com/en-us/azure/machine-learning/concept-endpoints?view=azureml-api-2#what-are-batch-endpoints\" target=\"_blank\"> What are batch endpoints?</a> In this sub-section, we will cover the following items:\n",
"\n",
"* Create a batch endpoint.\n",
"* Create a batch deployment.\n",
"* Set the deployment as default. Doing so allows invoking the endpoint without specifying the deployment's name."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create a batch endpoint"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time, sys, uuid\n",
"from azure.ai.ml.entities import (\n",
" BatchEndpoint,\n",
" BatchDeployment,\n",
" BatchRetrySettings,\n",
" AmlCompute,\n",
")\n",
"\n",
"# Endpoint names need to be unique in a region,\n",
"# hence using uuid (first 8 character) to create unique endpoint name\n",
"endpoint_name = (\n",
" \"image-to-image-\" + str(uuid.uuid4())[:8]\n",
") # Replace with your endpoint name\n",
"\n",
"# Create a batch endpoint\n",
"endpoint = BatchEndpoint(\n",
" name=endpoint_name,\n",
" description=\"Batch endpoint for \" + model.name + \", for image to image task\",\n",
")\n",
"workspace_ml_client.begin_create_or_update(endpoint).result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create a batch deployment\n",
"\n",
"__Note__: `mini_batch_size` is the number of CSV files processed by the model in a single mini_batch."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"deployment_name = \"image-to-image-demo\"\n",
"\n",
"deployment = BatchDeployment(\n",
" name=deployment_name,\n",
" endpoint_name=endpoint_name,\n",
" model=model.id,\n",
" compute=compute_name,\n",
" error_threshold=0,\n",
" instance_count=1,\n",
" logging_level=\"info\",\n",
" max_concurrency_per_instance=1,\n",
" mini_batch_size=1,\n",
" output_file_name=\"predictions.csv\",\n",
" retry_settings=BatchRetrySettings(max_retries=2, timeout=9999),\n",
")\n",
"workspace_ml_client.begin_create_or_update(deployment).result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Set the deployment as default"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"endpoint = workspace_ml_client.batch_endpoints.get(endpoint_name)\n",
"endpoint.defaults.deployment_name = deployment_name\n",
"workspace_ml_client.begin_create_or_update(endpoint).result()\n",
"\n",
"endpoint = workspace_ml_client.batch_endpoints.get(endpoint_name)\n",
"print(f\"The default deployment is {endpoint.defaults.deployment_name}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Test the endpoint - using csv files\n",
"\n",
"Invoke the batch endpoint with the input parameter pointing to the directory containing one or more csv files containing the batch inference input. This creates a pipeline job using the default deployment in the endpoint. Wait for the job to complete.\n",
"\n",
"__Note__: If job failed with Out of Memory Error then please try splitting your input into smaller csv files or decreasing mini_batch_size for the deployment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job = None\n",
"num_retries = 3\n",
"for i in range(num_retries):\n",
" try:\n",
" job = workspace_ml_client.batch_endpoints.invoke(\n",
" endpoint_name=endpoint.name, input=input\n",
" )\n",
" break\n",
" except Exception as e:\n",
" if i == num_retries - 1:\n",
" raise e\n",
" else:\n",
" print(\"Endpoint invocation failed. Retrying after 5 seconds...\")\n",
" time.sleep(5)\n",
"if job is not None:\n",
" workspace_ml_client.jobs.stream(job.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Note__: If the job failed with error Assertion Error (The actual length exceeded max length 100 MB) then please consider dividing input csv file into multiple csv files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"scoring_job = list(workspace_ml_client.jobs.list(parent_job_name=job.name))[0]\n",
"\n",
"workspace_ml_client.jobs.download(\n",
" name=scoring_job.name,\n",
" download_path=\".\",\n",
" output_name=\"score\",\n",
")\n",
"\n",
"predictions_file = os.path.join(\"named-outputs\", \"score\", \"predictions.csv\")\n",
"\n",
"# Load the batch predictions file with no headers into a dataframe and set your column names\n",
"score_df = pd.read_csv(\n",
" predictions_file,\n",
" header=None,\n",
" names=[\n",
" \"row_number_per_file\",\n",
" \"image_file_name\",\n",
" \"nsfw_content_detected\",\n",
" \"input_csv_name\",\n",
" ],\n",
")\n",
"score_df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 6. Clean up resources - delete the endpoint\n",
"Batch endpoints use compute resources only when jobs are submitted. You can keep the batch endpoint for your reference without worrying about compute bills, or choose to delete the endpoint. If you created your compute cluster to have zero minimum instances and scale down soon after being idle, you won't be charged for an unused compute."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"workspace_ml_client.batch_endpoints.begin_delete(name=endpoint_name).result()"
]
}
],
"metadata": {
"language_info": {
"name": "ipython"
}
},
"nbformat": 4,
"nbformat_minor": 2
}