sdk/python/foundation-models/healthcare-ai/medimageinsight/advanced-call-example.ipynb (546 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Scalable MedImageInsight Endpoint Usage\n",
"\n",
"**Requirements** - To run this notebook, you will need:\n",
"- A basic understanding of Python and medical image processing\n",
"- Access to an Azure Machine Learning workspace and an online endpoint\n",
"- Installed necessary Python packages listed below\n",
"\n",
"**Learning Objectives** - By the end of this tutorial, you will learn how to:\n",
"- Read and process DICOM images into NumPy arrays\n",
"- Convert processed images into image byte arrays\n",
"- Submit requests to an Azure Machine Learning endpoint with retry and rate limit handling\n",
"- Use `joblib` and `tqdm` for parallel processing and progress monitoring\n",
"\n",
"**Motivation** - This notebook demonstrates how to generate embeddings of medical images at scale using the MedImageInsight API while handling potential network issues gracefully.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prerequisites \n",
"\n",
"### Create MedImageInsight endpoint\n",
"* Follow instructions in [deploy](./deploy.ipynb)\n",
"\n",
"### Download data\n",
"\n",
"`azcopy copy --recursive https://azuremlexampledata.blob.core.windows.net/data/healthcare-ai/ /home/azureuser/data/`\n",
"\n",
"### Install Required Packages\n",
"\n",
"We need to install several packages to ensure all functionalities are available.\n",
"\n",
"`pip install 'tenacity~=9.0.0' 'ratelimit~=2.2.0' 'tqdm~=4.66.0' 'simpleitk~=2.4.0' 'joblib>1.4.0'`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Import Libraries\n",
"\n",
"Import all the required libraries for image processing, handling requests, and parallel processing.\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from io import BytesIO\n",
"from PIL import Image\n",
"import itertools\n",
"import SimpleITK as sitk\n",
"import tempfile\n",
"from base64 import encodebytes\n",
"from azure.ai.ml import MLClient\n",
"from azure.identity import DefaultAzureCredential\n",
"import glob\n",
"from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception\n",
"from ratelimit import limits, sleep_and_retry, RateLimitException\n",
"from requests.exceptions import ConnectionError, Timeout, HTTPError\n",
"import requests\n",
"from joblib import Parallel, delayed\n",
"from tqdm import tqdm\n",
"\n",
"# Suppress SimpleITK warnings\n",
"sitk.ProcessObject_SetGlobalWarningDisplay(False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Image Processing Functions\n",
"\n",
"Define functions to load, read, and convert DICOM files into image byte arrays.\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def load_databytes(path):\n",
" \"\"\"\n",
" Load data bytes from a file path.\n",
"\n",
" Parameters\n",
" ----------\n",
" path : str\n",
" File path.\n",
"\n",
" Returns\n",
" -------\n",
" bytes\n",
" Data bytes.\n",
" \"\"\"\n",
" with open(path, \"rb\") as f:\n",
" return f.read()\n",
"\n",
"\n",
"def read_dicom_bytes_to_numpy(dicom_bytes: bytes) -> np.ndarray:\n",
" \"\"\"\n",
" Read DICOM data from bytes to a NumPy array, applying windowing and normalization.\n",
"\n",
" Parameters\n",
" ----------\n",
" dicom_bytes : bytes\n",
" The DICOM file content in bytes.\n",
"\n",
" Returns\n",
" -------\n",
" np.ndarray\n",
" The windowed image as a NumPy array.\n",
" \"\"\"\n",
" with tempfile.NamedTemporaryFile(suffix=\".dcm\") as temp_file:\n",
" temp_file.write(dicom_bytes)\n",
" temp_file.flush()\n",
" img = sitk.ReadImage(temp_file.name)\n",
" img_array = sitk.GetArrayFromImage(img).astype(np.float32)[0, :, :]\n",
" img_array = np.clip(img_array, *np.percentile(img_array, [10, 90]))\n",
" img_array = (\n",
" (img_array - img_array.min()) * 255 / (img_array.max() - img_array.min())\n",
" ).astype(np.uint8)\n",
" return img_array\n",
"\n",
"\n",
"def numpy_to_image_bytearray(img: np.ndarray, format: str = \"PNG\") -> bytes:\n",
" \"\"\"\n",
" Convert a NumPy array to an image byte array.\n",
"\n",
" Parameters\n",
" ----------\n",
" img : np.ndarray\n",
" The image as a NumPy array.\n",
" format : str, optional\n",
" The image format, by default \"PNG\".\n",
"\n",
" Returns\n",
" -------\n",
" bytes\n",
" The image in byte array format.\n",
" \"\"\"\n",
" byte_io = BytesIO()\n",
" pil_image = Image.fromarray(img)\n",
" if pil_image.mode == \"L\":\n",
" pil_image = pil_image.convert(\"RGB\")\n",
" pil_image.save(byte_io, format=format)\n",
" return byte_io.getvalue()\n",
"\n",
"\n",
"def read_to_imagebytes(dcm_bytes):\n",
" \"\"\"\n",
" Convert DICOM bytes to image byte array.\n",
"\n",
" Parameters\n",
" ----------\n",
" dcm_bytes : bytes\n",
" DICOM file content in bytes.\n",
"\n",
" Returns\n",
" -------\n",
" bytes\n",
" Image data in bytes.\n",
" \"\"\"\n",
" np_img = read_dicom_bytes_to_numpy(dcm_bytes)\n",
" return numpy_to_image_bytearray(np_img)\n",
"\n",
"\n",
"def path_to_imagebytes(path):\n",
" \"\"\"\n",
" Convert a DICOM file at a given path to image byte array.\n",
"\n",
" Parameters\n",
" ----------\n",
" path : str\n",
" File path to the DICOM file.\n",
"\n",
" Returns\n",
" -------\n",
" bytes\n",
" Image data in bytes.\n",
" \"\"\"\n",
" bytes_data = load_databytes(path)\n",
" return read_to_imagebytes(bytes_data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Request Submission Functions\n",
"\n",
"### 3.1 Creating Post Function with Retries and Rate Limiting\n",
"\n",
"Define a robust `post` function that handles retries and respects rate limits.\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def create_post_func(\n",
" retries=5, rate_calls=60, rate_period=60, exp_multiplier=1, exp_min=2, exp_max=60\n",
"):\n",
" \"\"\"\n",
" Create a post function with retries and rate limiting.\n",
"\n",
" Parameters\n",
" ----------\n",
" retries : int\n",
" Number of retry attempts.\n",
" rate_calls : int\n",
" Number of allowed calls in the period.\n",
" rate_period : int\n",
" Period in seconds for rate limiting.\n",
" exp_multiplier : int\n",
" Multiplier for exponential backoff.\n",
" exp_min : int\n",
" Minimum wait time in seconds.\n",
" exp_max : int\n",
" Maximum wait time in seconds.\n",
"\n",
" Returns\n",
" -------\n",
" function\n",
" Configured post function.\n",
" \"\"\"\n",
"\n",
" def is_retryable_exception(exc):\n",
" if isinstance(exc, (ConnectionError, Timeout, RateLimitException)):\n",
" return True\n",
" elif isinstance(exc, HTTPError) and exc.response is not None:\n",
" if 500 <= exc.response.status_code < 600 or exc.response.status_code == 429:\n",
" return True\n",
" return False\n",
"\n",
" @retry(\n",
" retry=retry_if_exception(is_retryable_exception),\n",
" wait=wait_exponential(multiplier=exp_multiplier, min=exp_min, max=exp_max),\n",
" stop=stop_after_attempt(retries),\n",
" )\n",
" @sleep_and_retry\n",
" @limits(calls=rate_calls, period=rate_period)\n",
" def post(*args, **kwargs):\n",
" response = requests.post(*args, **kwargs)\n",
" response.raise_for_status()\n",
" return response.json()\n",
"\n",
" return post"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3.2 Submitting Requests\n",
"\n",
"Functions to submit image data to the Azure ML online endpoint, handling batch requests.\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def submit_batch_request(list_image_databytes, params, target, headers, post_func):\n",
" \"\"\"\n",
" Submit a batch of image data to the endpoint.\n",
"\n",
" Parameters\n",
" ----------\n",
" list_image_databytes : list\n",
" List of image data in bytes.\n",
" params : dict\n",
" Additional parameters for the request.\n",
" target : str\n",
" Endpoint URL.\n",
" headers : dict\n",
" Request headers.\n",
" post_func : function\n",
" Function to post the request.\n",
"\n",
" Returns\n",
" -------\n",
" list\n",
" List of results from the endpoint.\n",
" \"\"\"\n",
" text_data = \"\"\n",
"\n",
" def encode_data(image_databytes, text_data):\n",
" return [encodebytes(image_databytes).decode(\"utf-8\"), text_data]\n",
"\n",
" payload = {\n",
" \"input_data\": {\n",
" \"columns\": [\"image\", \"text\"],\n",
" \"index\": [i for i in range(len(list_image_databytes))],\n",
" \"data\": [\n",
" encode_data(image_databytes, text_data)\n",
" for image_databytes in list_image_databytes\n",
" ],\n",
" },\n",
" \"params\": params,\n",
" }\n",
"\n",
" response_json = post_func(target, json=payload, headers=headers)\n",
" result = [r[\"image_features\"] for r in response_json]\n",
" return result\n",
"\n",
"\n",
"def submit_request(image_databytes, params, target, headers, post_func):\n",
" \"\"\"\n",
" Submit a single image data to the endpoint.\n",
"\n",
" Parameters\n",
" ----------\n",
" image_databytes : bytes\n",
" Image data in bytes.\n",
" params : dict\n",
" Additional parameters for the request.\n",
" target : str\n",
" Endpoint URL.\n",
" headers : dict\n",
" Request headers.\n",
" post_func : function\n",
" Function to post the request.\n",
"\n",
" Returns\n",
" -------\n",
" Any\n",
" Result from the endpoint.\n",
" \"\"\"\n",
" return submit_batch_request([image_databytes], params, target, headers, post_func)[\n",
" 0\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Configure Azure ML Client\n",
"\n",
"Set up the Azure ML client to interact with the online endpoint.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Azure ML endpoint name\n",
"endpoint_name = \"\" # Set this to the name of the endpoint you wish to use.\n",
"\n",
"# Initialize MLClient with DefaultAzureCredential\n",
"ml_client = MLClient.from_config(DefaultAzureCredential())\n",
"\n",
"# Get endpoint details\n",
"endpoint = ml_client.online_endpoints.get(name=endpoint_name)\n",
"keys = ml_client.online_endpoints.get_keys(name=endpoint_name)\n",
"\n",
"# Set target URL and headers\n",
"target = endpoint.scoring_uri\n",
"api_key = keys.primary_key\n",
"headers = {\"Authorization\": f\"Bearer {api_key}\"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Processing Images in Parallel\n",
"### 5.1. Retrieve DICOM File Paths\n",
"\n",
"Use `glob` to collect all DICOM file paths from a directory.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"filelist = list(\n",
" glob.glob(\n",
" \"/home/azureuser/data/healthcare-ai/medimageinsight-zeroshot/**/*.dcm\",\n",
" recursive=True,\n",
" )\n",
")\n",
"print(f\"Total DICOM files found: {len(filelist)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5.2 Process Images\n",
"\n",
"Process the DICOM images using parallel processing and collect the results.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define a function to process individual images and set up parallel processing\n",
"request_post_w_retry = create_post_func(retries=8, rate_calls=60, rate_period=60)\n",
"\n",
"\n",
"def process_path(path):\n",
" image_databytes = path_to_imagebytes(path)\n",
" return submit_request(image_databytes, {}, target, headers, request_post_w_retry)\n",
"\n",
"\n",
"# Number of parallel jobs\n",
"njobs = 3\n",
"\n",
"results = []\n",
"with tqdm(total=len(filelist)) as pbar:\n",
" # Process files in parallel and collect results\n",
" results_gen = Parallel(\n",
" n_jobs=njobs, prefer=\"threads\", return_as=\"generator_unordered\"\n",
" )(delayed(process_path)(path=path) for path in filelist)\n",
" for res in results_gen:\n",
" pbar.update(1)\n",
" results.append(res)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5.3. Process Images in Batches\n",
"\n",
"Process the DICOM images in batches using `submit_batch_request` directly, splitting the data into chunks of 10.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def batchify(iterable, batch_size=10):\n",
" \"\"\"Yield successive chunks of a specified size from an iterable.\"\"\"\n",
" iterator = iter(iterable)\n",
" while True:\n",
" chunk = list(itertools.islice(iterator, batch_size))\n",
" if not chunk:\n",
" break\n",
" yield chunk\n",
"\n",
"\n",
"def process_batch(batch_paths):\n",
" list_image_databytes = [path_to_imagebytes(path) for path in batch_paths]\n",
" batch_results = submit_batch_request(\n",
" list_image_databytes,\n",
" params=None,\n",
" target=target,\n",
" headers=headers,\n",
" post_func=request_post_w_retry,\n",
" )\n",
" return batch_results\n",
"\n",
"\n",
"# Number of parallel jobs\n",
"njobs = 3\n",
"\n",
"results = []\n",
"total_files = len(filelist)\n",
"batch_paths_list = list(batchify(filelist, batch_size=10))\n",
"\n",
"with tqdm(total=total_files) as pbar:\n",
" # Process batches in parallel and collect results\n",
" results_gen = Parallel(\n",
" n_jobs=njobs, prefer=\"threads\", return_as=\"generator_unordered\"\n",
" )(delayed(process_batch)(batch_paths) for batch_paths in batch_paths_list)\n",
" for batch_results in results_gen:\n",
" results.extend(batch_results)\n",
" pbar.update(len(batch_results))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Conclusion\n",
"\n",
"By implementing a robust request submission function with retries and rate limiting, we ensured reliable and efficient communication with the Azure Machine Learning online endpoint. This approach handles potential network issues gracefully, maintaining the integrity of the data processing pipeline.\n",
"\n",
"Processing multiple DICOM images in parallel using `joblib` and `tqdm` significantly enhanced computational efficiency. This method is crucial when dealing with large datasets common in medical imaging, allowing for scalable and time-effective data analysis.\n",
"\n",
"The combination of these techniques facilitated the successful submission of image data to the endpoint and retrieval of results, demonstrating an effective and robust image processing workflow.\n",
"\n",
"---\n",
"\n",
"**Next Steps**:\n",
"\n",
"- Utilize the similar robust request functions to interact with other Azure ML models, such as **\"MedImageParse\"** and **\"CRReportGen\"**, to broaden the scope of your medical imaging analysis.\n",
"- Setup [autoscaling](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-autoscale-endpoints?view=azureml-api-2&tabs=python) with your endpoint to significantly improve performance!\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "azureml_py310_sdkv2",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}