# Open Source LLM serving using the Azure ML Python SDK

[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.


In [None]:
%store -r job_name
try:
    job_name
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] Please run the previous notebook (model training) again.")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

## 1. Load config file

---


In [None]:
%load_ext autoreload
%autoreload 2

import os, sys
lab_prep_dir = os.getcwd().split("slm-innovator-lab")[0] + "slm-innovator-lab/0_lab_preparation"
sys.path.append(os.path.abspath(lab_prep_dir))

from common import check_kernel
check_kernel()

In [None]:
import os
import yaml
from datetime import datetime
snapshot_date = datetime.now().strftime("%Y-%m-%d")

with open('config.yml') as f:
    d = yaml.load(f, Loader=yaml.FullLoader)
    
AZURE_SUBSCRIPTION_ID = d['config']['AZURE_SUBSCRIPTION_ID']
AZURE_RESOURCE_GROUP = d['config']['AZURE_RESOURCE_GROUP']
AZURE_WORKSPACE = d['config']['AZURE_WORKSPACE']
AZURE_DATA_NAME = d['config']['AZURE_DATA_NAME']    
DATA_DIR = d['config']['DATA_DIR']
CLOUD_DIR = d['config']['CLOUD_DIR']
HF_MODEL_NAME_OR_PATH = d['config']['HF_MODEL_NAME_OR_PATH']
IS_DEBUG = d['config']['IS_DEBUG']

azure_env_name = d['serve']['azure_env_name']
azure_model_name = d['serve']['azure_model_name']
azure_endpoint_name = d['serve']['azure_endpoint_name']
azure_deployment_name = d['serve']['azure_deployment_name']
azure_serving_cluster_size = d['serve']['azure_serving_cluster_size']

<br>

## 2. Serving preparation

---

### 2.1. Configure workspace details

To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.


In [None]:
# import required libraries
import time
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml import command
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

credential = DefaultAzureCredential()
ml_client = None
try:
    ml_client = MLClient.from_config(credential)
except Exception as ex:
    print(ex)
    ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)

### 2.2. Create model asset


In [None]:
def get_or_create_model_asset(ml_client, model_name, job_name, model_dir="outputs", model_type="custom_model", update=False):
    
    try:
        latest_model_version = max([int(m.version) for m in ml_client.models.list(name=model_name)])
        if update:
            raise ResourceExistsError('Found Model asset, but will update the Model.')
        else:
            model_asset = ml_client.models.get(name=model_name, version=latest_model_version)
            print(f"Found Model asset: {model_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")        
        model_path = f"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/"    
        run_model = Model(
            name=model_name,        
            path=model_path,
            description="Model created from run.",
            type=model_type # mlflow_model, custom_model, triton_model
        )
        model_asset = ml_client.models.create_or_update(run_model)
        print(f"Created Model asset: {model_name}")

    return model_asset

In [None]:
model_dir = d['train']['model_dir']
model = get_or_create_model_asset(ml_client, azure_model_name, job_name, model_dir, model_type="custom_model", update=False)

### 2.3. Create AzureML environment

Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda). This hands-on uses Docker container.


#### Docker environment


In [None]:
%%writefile {CLOUD_DIR}/serve/Dockerfile
FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2004-cu124-py310-torch241:biweekly.202410.2

# Install pip dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

# Inference requirements
COPY --from=mcr.microsoft.com/azureml/o16n-base/python-assets:20230419.v1 /artifacts /var/

RUN /var/requirements/install_system_requirements.sh && \
    cp /var/configuration/rsyslog.conf /etc/rsyslog.conf && \
    cp /var/configuration/nginx.conf /etc/nginx/sites-available/app && \
    ln -sf /etc/nginx/sites-available/app /etc/nginx/sites-enabled/app && \
    rm -f /etc/nginx/sites-enabled/default
ENV SVDIR=/var/runit
ENV WORKER_TIMEOUT=400
EXPOSE 5001 8883 8888

# support Deepspeed launcher requirement of passwordless ssh login
RUN apt-get update
RUN apt-get install -y openssh-server openssh-client

RUN MAX_JOBS=4 pip install flash-attn==2.6.3 --no-build-isolation

In [None]:
%%writefile {CLOUD_DIR}/serve/requirements.txt
azureml-core==1.58.0
azureml-dataset-runtime==1.58.0
azureml-defaults==1.58.0
azure-ml==0.0.1
azure-ml-component==0.9.18.post2
azureml-mlflow==1.58.0
azureml-contrib-services==1.58.0
azureml-contrib-services==1.58.0
azureml-automl-common-tools==1.58.0
torch-tb-profiler==0.4.3
azureml-inference-server-http~=1.3
inference-schema==1.8.0
MarkupSafe==3.0.2
regex
pybind11
bitsandbytes==0.44.1
transformers==4.46.1
peft==0.13.2
accelerate==1.1.0
datasets
scipy
azure-identity
packaging==24.1
timm==1.0.11
einops

In [None]:
from azure.ai.ml.entities import Environment, BuildContext

def get_or_create_docker_environment_asset(ml_client, env_name, docker_dir, update=False):
    
    try:
        latest_env_version = max([int(e.version) for e in ml_client.environments.list(name=env_name)])
        if update:
            raise ResourceExistsError('Found Environment asset, but will update the Environment.')
        else:
            env_asset = ml_client.environments.get(name=env_name, version=latest_env_version)
            print(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")
        env_docker_image = Environment(
            build=BuildContext(path=docker_dir),
            name=env_name,
            description="Environment created from a Docker context.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        print(f"Created Environment asset: {env_name}")
    
    return env_asset

env = get_or_create_docker_environment_asset(ml_client, azure_env_name, f"{CLOUD_DIR}/serve", update=False)

### 2.4. Serving script

If you are not serving with MLflow but with a custom model, you are free to write your own code.The `score.py` example below shows how to write the code.

-   `init()`: This function is the place to write logic for global initialization operations like loading the LLM model.
-   `run()`: Inference logic is called for every invocation of the endpoint.


In [None]:
%%writefile src_serve/score.py
import os
import re
import json
import torch
import base64
import logging

from io import BytesIO
from PIL import Image
from transformers import AutoTokenizer, AutoProcessor, BitsAndBytesConfig, get_scheduler
from transformers import AutoModelForCausalLM, AutoProcessor
from PIL import Image, ImageDraw, ImageFont

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def run_example_base64(task_prompt, text_input, base64_image, params):
 
    max_new_tokens = params["max_new_tokens"]
    num_beams = params["num_beams"]
    
    image = Image.open(BytesIO(base64.b64decode(base64_image)))
    prompt = task_prompt + text_input

    # Ensure the image is in RGB mode
    if image.mode != "RGB":
        image = image.convert("RGB")

    inputs = processor(text=prompt, images=image, return_tensors="pt").to(device)
    generated_ids = model.generate(
        input_ids=inputs["input_ids"],
        pixel_values=inputs["pixel_values"],
        max_new_tokens=max_new_tokens,
        num_beams=num_beams
    )
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
    parsed_answer = processor.post_process_generation(generated_text, task=task_prompt, image_size=(image.width, image.height))
    return parsed_answer


def init():
    """
    This function is called when the container is initialized/started, typically after create/update of the deployment.
    You can write the logic here to perform init operations like caching the model in memory
    """
    global model
    global processor
    # AZUREML_MODEL_DIR is an environment variable created during deployment.
    # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
    # Please provide your model's folder name if there is one
    model_name_or_path = os.path.join(
        os.getenv("AZUREML_MODEL_DIR"), "outputs"
    )
    
    model_kwargs = dict(
        trust_remote_code=True,
        revision="refs/pr/6",        
        device_map=device
    )
    
    processor_kwargs = dict(
        trust_remote_code=True,
        revision="refs/pr/6"
    )
    
    model = AutoModelForCausalLM.from_pretrained(model_name_or_path, **model_kwargs)
    processor = AutoProcessor.from_pretrained(model_name_or_path, **processor_kwargs)    

    logging.info("Loaded model.")
    
def run(json_data: str):
    logging.info("Request received")
    data = json.loads(json_data)
    task_prompt = data["task_prompt"]
    text_input = data["text_input"]
    base64_image = data["image_input"]
    params = data['params']

    generated_text = run_example_base64(task_prompt, text_input, base64_image, params)
    json_result = {"result": str(generated_text)}
    
    return json_result    

<br>

## 3. Serving

---

### 3.1. Create endpoint

Create an endpoint. This process does not provision a GPU cluster yet.


In [None]:
%%time
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    IdentityConfiguration,
    ManagedIdentityConfiguration,
)

# Check if the endpoint already exists in the workspace
try:
    endpoint = ml_client.online_endpoints.get(azure_endpoint_name)
    print("---Endpoint already exists---")
except:
    # Create an online endpoint if it doesn't exist

    # Define the endpoint
    endpoint = ManagedOnlineEndpoint(
        name=azure_endpoint_name,
        description=f"Test endpoint for {model.name}",
        # identity=IdentityConfiguration(
        #     type="user_assigned",
        #     user_assigned_identities=[ManagedIdentityConfiguration(resource_id=uai_id)],
        # )
        # if uai_id != ""
        # else None,
    )

# Trigger the endpoint creation
try:
    ml_client.begin_create_or_update(endpoint).wait()
    print("\n---Endpoint created successfully---\n")
except Exception as err:
    raise RuntimeError(
        f"Endpoint creation failed. Detailed Response:\n{err}"
    ) from err

### 3.2. Create Deployment

Create a Deployment. This takes a lot of time as GPU clusters must be provisioned and the serving environment must be built.


In [None]:
%%time
from azure.ai.ml.entities import (    
    OnlineRequestSettings,
    CodeConfiguration,
    ManagedOnlineDeployment,
    ProbeSettings,
    Environment
)

deployment = ManagedOnlineDeployment(
    name=azure_deployment_name,
    endpoint_name=azure_endpoint_name,
    model=model,
    instance_type=azure_serving_cluster_size,
    instance_count=1,
    #code_configuration=code_configuration,
    environment = env,
    scoring_script="score.py",
    code_path="./src_serve",
    #environment_variables=deployment_env_vars,
    request_settings=OnlineRequestSettings(max_concurrent_requests_per_instance=3,
                                           request_timeout_ms=90000, max_queue_wait_ms=60000),
    liveness_probe=ProbeSettings(
        failure_threshold=30,
        success_threshold=1,
        period=100,
        initial_delay=500,
    ),
    readiness_probe=ProbeSettings(
        failure_threshold=30,
        success_threshold=1,
        period=100,
        initial_delay=500,
    ),
)

# Trigger the deployment creation
try:
    ml_client.begin_create_or_update(deployment).wait()
    print("\n---Deployment created successfully---\n")
except Exception as err:
    raise RuntimeError(
        f"Deployment creation failed. Detailed Response:\n{err}"
    ) from err
    
endpoint.traffic = {azure_deployment_name: 100}
endpoint_poller = ml_client.online_endpoints.begin_create_or_update(endpoint)    

In [None]:
endpoint_poller.result()

<br>

## 4. Test

---

### 4.1. Invocation

Try calling the endpoint.


In [None]:
import os
import json
import base64


with open('./test_images/DocumentVQA_Test_01.jpg', 'rb') as img:
    base64_img = base64.b64encode(img.read()).decode('utf-8')
    
sample = {
    "task_prompt": "DocVQA",
    "image_input": base64_img,
    "text_input": "What do you see in this image", 
    "params": {
        "max_new_tokens": 512,
        "num_beams": 3
    }
}

test_src_dir = "./inference-test"
os.makedirs(test_src_dir, exist_ok=True)
print(f"test script directory: {test_src_dir}")
sample_data_path = os.path.join(test_src_dir, "sample-request.json")

with open(sample_data_path, "w") as f:
    json.dump(sample, f)

In [None]:
result = ml_client.online_endpoints.invoke(
    endpoint_name=azure_endpoint_name,
    deployment_name=azure_deployment_name,
    request_file=sample_data_path,
)

result_json = json.loads(result)
print(result_json['result'])

### 4.2. LLM latency/throughput benchmarking


In [None]:
import numpy as np
from time import perf_counter

def benchmark_latency(endpoint_name, deployment_name, sample_data_path, num_warmups=1, num_infers=5):
    print(f"Measuring latency for Endpoint '{endpoint_name}' and Deployment '{deployment_name}', num_infers={num_infers}")

    latencies = []
    # warm up
    for _ in range(num_warmups):
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        ) 
        
    begin = time.time()        
    # Timed run
    for _ in range(num_infers):
        start_time = perf_counter()
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        )
        latency = perf_counter() - start_time
        latencies.append(latency)
    end = time.time() 
        
    # Compute run statistics
    duration = end - begin    
    time_avg_sec = np.mean(latencies)
    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)
    
    # Metrics
    metrics = {
        'duration': duration,
        'avg_sec': time_avg_sec,
        'std_sec': time_std_sec,        
        'p95_sec': time_p95_sec,
        'p99_sec': time_p99_sec    
    }
    
    return metrics

def benchmark_latency_multicore(endpoint_name, deployment_name, sample_data_path, num_warmups=1, num_infers=5, num_threads=2):
    import time
    import concurrent.futures

    # Warmup
    for _ in range(num_warmups):
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        )        
                
    latencies = []

    # Thread task: Each of these thread tasks executes in a serial loop for a single model.
    #              Multiple of these threads are launched to achieve parallelism.
    def task(model):
        for _ in range(num_infers):
            start = time.time()
            result = ml_client.online_endpoints.invoke(
                endpoint_name=endpoint_name,
                deployment_name=deployment_name,
                request_file=sample_data_path,
            )   
            finish = time.time()
            latencies.append(finish - start)
            
    # Submit tasks
    begin = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as pool:
        for i in range(num_threads):
            pool.submit(task, model)
    end = time.time()

    # Compute metrics
    duration = end - begin
    inferences = len(latencies)
    throughput = inferences / duration
    avg_latency = sum(latencies) / len(latencies)
    
    # Compute run statistics
    time_avg_sec = np.mean(latencies)
    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)
    
    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)

    # Metrics
    metrics = {
        'threads': num_threads,
        'duration': duration,
        'throughput': throughput,
        'avg_sec': avg_latency,
        'std_sec': time_std_sec,        
        'p95_sec': time_p95_sec,
        'p99_sec': time_p99_sec    
    }
    
    return metrics

In [None]:
benchmark_result = benchmark_latency(azure_endpoint_name, azure_deployment_name, sample_data_path, num_warmups=1, num_infers=10)

In [None]:
print(benchmark_result)

## Clean up


In [None]:
!rm -rf {test_src_dir}

In [None]:
ml_client.online_endpoints.begin_delete(azure_endpoint_name)