# Open Source LLM serving using the Azure ML Python SDK

[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.


In [None]:
%load_ext autoreload
%autoreload 2

import os, sys
lab_prep_dir = os.getcwd().split("slm-innovator-lab")[0] + "slm-innovator-lab/0_lab_preparation"
sys.path.append(os.path.abspath(lab_prep_dir))

from common import check_kernel
check_kernel()

In [None]:
%store -r job_name
try:
    job_name
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] Please run the previous notebook (model training) again.")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

## 1. Load config file

---


In [None]:
import os
import yaml
from logger import logger
from datetime import datetime

snapshot_date = datetime.now().strftime("%Y-%m-%d")

with open("config.yml") as f:
    d = yaml.load(f, Loader=yaml.FullLoader)

AZURE_SUBSCRIPTION_ID = d["config"]["AZURE_SUBSCRIPTION_ID"]
AZURE_RESOURCE_GROUP = d["config"]["AZURE_RESOURCE_GROUP"]
AZURE_WORKSPACE = d["config"]["AZURE_WORKSPACE"]
AZURE_DATA_NAME = d["config"]["AZURE_DATA_NAME"]
DATA_DIR = d["config"]["DATA_DIR"]
CLOUD_DIR = d["config"]["CLOUD_DIR"]
HF_MODEL_NAME_OR_PATH = d["config"]["HF_MODEL_NAME_OR_PATH"]
IS_DEBUG = d["config"]["IS_DEBUG"]

azure_env_name = d["serve"]["azure_env_name"]
azure_model_name = d["serve"]["azure_model_name"]
azure_endpoint_name = d["serve"]["azure_endpoint_name"]
azure_deployment_name = d["serve"]["azure_deployment_name"]
azure_serving_cluster_size = d["serve"]["azure_serving_cluster_size"]
port = d["serve"]["port"]
engine = d["serve"]["engine"]

logger.info("===== 0. Azure ML Deployment Info =====")
logger.info(f"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}")
logger.info(f"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}")
logger.info(f"AZURE_WORKSPACE={AZURE_WORKSPACE}")
logger.info(f"AZURE_DATA_NAME={AZURE_DATA_NAME}")
logger.info(f"DATA_DIR={DATA_DIR}")
logger.info(f"CLOUD_DIR={CLOUD_DIR}")
logger.info(f"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}")
logger.info(f"IS_DEBUG={IS_DEBUG}")

logger.info(f"azure_env_name={azure_env_name}")
logger.info(f"azure_model_name={azure_model_name}")
logger.info(f"azure_endpoint_name={azure_endpoint_name}")
logger.info(f"azure_deployment_name={azure_deployment_name}")
logger.info(f"azure_serving_cluster_size={azure_serving_cluster_size}")
logger.info(f"port={port}")
logger.info(f"engine={engine}")

<br>

## 2. Serving preparation

---

### 2.1. Configure workspace details

To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.


In [None]:
# import required libraries
import time
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml import command
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

logger.info(f"===== 2. Serving preparation =====")
logger.info(f"Calling DefaultAzureCredential.")
credential = DefaultAzureCredential()
ml_client = MLClient(
    credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE
)

### 2.2. Create model asset


In [None]:
def get_or_create_model_asset(
    ml_client,
    model_name,
    job_name,
    model_dir="outputs",
    model_type="custom_model",
    update=False,
):

    try:
        latest_model_version = max(
            [int(m.version) for m in ml_client.models.list(name=model_name)]
        )
        if update:
            raise ResourceExistsError("Found Model asset, but will update the Model.")
        else:
            model_asset = ml_client.models.get(
                name=model_name, version=latest_model_version
            )
            logger.info(f"Found Model asset: {model_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        logger.info(f"Exception: {e}")
        model_path = f"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/"
        run_model = Model(
            name=model_name,
            path=model_path,
            description="Model created from run.",
            type=model_type,  # mlflow_model, custom_model, triton_model
        )
        model_asset = ml_client.models.create_or_update(run_model)
        logger.info(f"Created Model asset: {model_name}")

    return model_asset

In [None]:
model_dir = d["train"]["model_dir"]
model = get_or_create_model_asset(
    ml_client,
    azure_model_name,
    job_name,
    model_dir,
    model_type="custom_model",
    update=False,
)

### 2.3. Create AzureML environment

Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda). This hands-on uses Docker container.


#### Docker environment


In [None]:
%%writefile {CLOUD_DIR}/serve/Dockerfile
FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2004-cu124-py310-torch241:biweekly.202503.1

# Install pip dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

# Inference requirements
COPY --from=mcr.microsoft.com/azureml/o16n-base/python-assets:20230419.v1 /artifacts /var/

RUN /var/requirements/install_system_requirements.sh && \
    cp /var/configuration/rsyslog.conf /etc/rsyslog.conf && \
    cp /var/configuration/nginx.conf /etc/nginx/sites-available/app && \
    ln -sf /etc/nginx/sites-available/app /etc/nginx/sites-enabled/app && \
    rm -f /etc/nginx/sites-enabled/default
ENV SVDIR=/var/runit
ENV WORKER_TIMEOUT=400
EXPOSE 5001 8883 8888

# support Deepspeed launcher requirement of passwordless ssh login
RUN apt-get update
RUN apt-get install -y openssh-server openssh-client

RUN MAX_JOBS=4 pip install flash-attn==2.7.4.post1 --no-build-isolation

In [None]:
%%writefile {CLOUD_DIR}/serve/requirements.txt
azureml-core==1.59.0.post1
azureml-dataset-runtime==1.59.0
azureml-defaults==1.59.0
azure-ml==0.0.1
azure-ml-component==0.9.18.post2
azureml-mlflow==1.59.0.post1
azureml-contrib-services==1.59.0
azureml-automl-common-tools==1.59.0
torch-tb-profiler==0.4.3
azureml-inference-server-http~=1.4
inference-schema==1.8.0
MarkupSafe==3.0.2
regex
pybind11
bitsandbytes==0.45.3
transformers==4.49.0
peft~=0.14.0
accelerate~=1.5.2
datasets==3.4.0
scipy
azure-identity
packaging==24.2
timm==1.0.15
einops==0.8.1

In [None]:
from azure.ai.ml.entities import Environment, BuildContext


def get_or_create_docker_environment_asset(
    ml_client, env_name, docker_dir, update=False
):

    try:
        latest_env_version = max(
            [int(e.version) for e in ml_client.environments.list(name=env_name)]
        )
        if update:
            raise ResourceExistsError(
                "Found Environment asset, but will update the Environment."
            )
        else:
            env_asset = ml_client.environments.get(
                name=env_name, version=latest_env_version
            )
            logger.info(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        logger.info(f"Exception: {e}")
        env_docker_image = Environment(
            build=BuildContext(path=docker_dir),
            name=env_name,
            description="Environment created from a Docker context.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        logger.info(f"Created Environment asset: {env_name}")

    return env_asset


env = get_or_create_docker_environment_asset(
    ml_client, azure_env_name, f"{CLOUD_DIR}/serve", update=False
)

### 2.4. Serving script

If you are not serving with MLflow but with a custom model, you are free to write your own code.The `score.py` example below shows how to write the code.

-   `init()`: This function is the place to write logic for global initialization operations like loading the LLM model.
-   `run()`: Inference logic is called for every invocation of the endpoint.


In [None]:
%%writefile src_serve/score.py
import os
import logging
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
from peft import LoraConfig, get_peft_model

def init():
    """
    This function is called when the container is initialized/started, typically after create/update of the deployment.
    You can write the logic here to perform init operations like caching the model in memory
    """
    global model
    global tokenizer
    # AZUREML_MODEL_DIR is an environment variable created during deployment.
    # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
    # Please provide your model's folder name if there is one
    model_path = os.path.join(
        os.getenv("AZUREML_MODEL_DIR"), "{{score_model_dir}}"
    )
    model_id = "{{hf_model_name_or_path}}"
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = AutoModelForCausalLM.from_pretrained(model_id, device_map={"":0}, torch_dtype="auto", trust_remote_code=True)

    model.load_adapter(model_path)
    logging.info("Loaded model.")
    
def run(json_data: str):
    logging.info("Request received")
    data = json.loads(json_data)
    input_data= data["input_data"]
    params = data['params']
    
    pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)
    output = pipe(input_data, **params)
    generated_text = output[0]['generated_text']
    logging.info("Output Response: " + generated_text)
    json_result = {"result": str(generated_text)}
    
    return json_result

Plug in the appropriate variables in the model inference script.


In [None]:
import jinja2
from pathlib import Path
TRAINED_MLFLOW = False

jinja_env = jinja2.Environment()  

template = jinja_env.from_string(Path("src_serve/score.py").open().read())
score_model_dir = os.path.join(model_dir, "peft") if TRAINED_MLFLOW else model_dir    

Path("src_serve/score.py").open("w").write(
    template.render(score_model_dir=score_model_dir, hf_model_name_or_path=HF_MODEL_NAME_OR_PATH)
)

!pygmentize src_serve/score.py | cat -n

<br>

## 3. Serving

---

### 3.1. Create endpoint

Create an endpoint. This process does not provision a GPU cluster yet.


In [None]:
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    IdentityConfiguration,
    ManagedIdentityConfiguration,
)

logger.info(f"===== 3. Serving =====")

t0 = time.time()

# Check if the endpoint already exists in the workspace
try:
    endpoint = ml_client.online_endpoints.get(azure_endpoint_name)
    logger.info("---Endpoint already exists---")
except:
    # Create an online endpoint if it doesn't exist

    # Define the endpoint
    endpoint = ManagedOnlineEndpoint(
        name=azure_endpoint_name,
        description=f"Test endpoint for {model.name}",
        # identity=IdentityConfiguration(
        #     type="user_assigned",
        #     user_assigned_identities=[ManagedIdentityConfiguration(resource_id=uai_id)],
        # )
        # if uai_id != ""
        # else None,
    )

# Trigger the endpoint creation
try:
    ml_client.begin_create_or_update(endpoint).wait()
    logger.info("\n---Endpoint created successfully---\n")
except Exception as err:
    raise RuntimeError(f"Endpoint creation failed. Detailed Response:\n{err}") from err

t1 = time.time()

from humanfriendly import format_timespan

timespan = format_timespan(t1 - t0)
logger.info(f"Creating Endpoint took {timespan}")

### 3.2. Create Deployment

Create a Deployment. This takes a lot of time as GPU clusters must be provisioned and the serving environment must be built.


In [None]:
%%time
import time
from azure.ai.ml.entities import (    
    OnlineRequestSettings,
    CodeConfiguration,
    ManagedOnlineDeployment,
    ProbeSettings,
    Environment
)

t0 = time.time()
deployment = ManagedOnlineDeployment(
    name=azure_deployment_name,
    endpoint_name=azure_endpoint_name,
    model=model,
    instance_type=azure_serving_cluster_size,
    instance_count=1,
    #code_configuration=code_configuration,
    environment=env,
    scoring_script="score.py",
    code_path="./src_serve",
    #environment_variables=deployment_env_vars,
    request_settings=OnlineRequestSettings(
        max_concurrent_requests_per_instance=3,
        request_timeout_ms=90000, 
        max_queue_wait_ms=60000
    ),
    liveness_probe=ProbeSettings(
        failure_threshold=5,
        success_threshold=1,
        timeout=10,
        period=90,
        initial_delay=500,
    ),
    readiness_probe=ProbeSettings(
        failure_threshold=3,
        success_threshold=1,
        timeout=10,
        period=30,
        initial_delay=30,
    ),
)

# Trigger the deployment creation
try:
    ml_client.begin_create_or_update(deployment).wait()
    logger.info("\n---Deployment created successfully---\n")
except Exception as err:
    raise RuntimeError(
        f"Deployment creation failed. Detailed Response:\n{err}"
    ) from err
    
endpoint.traffic = {azure_deployment_name: 100}
endpoint_poller = ml_client.online_endpoints.begin_create_or_update(endpoint)

t1 = time.time()
timespan = format_timespan(t1 - t0)
logger.info(f"Creating deployment took {timespan}")

In [None]:
endpoint_poller.result()

<br>

## 4. Test

---

### 4.1. Invocation

Try calling the endpoint.


In [None]:
import os
import json

sample = {
    "input_data": [
        {"role": "user", "content": "Tell me Microsoft's brief history."},
        {
            "role": "assistant",
            "content": "Microsoft was founded by Bill Gates and Paul Allen on April 4, 1975, to develop and sell a BASIC interpreter for the Altair 8800.",
        },
        {"role": "user", "content": "What about Amazon's history?"},
    ],
    "params": {
        "temperature": 0.1,
        "max_new_tokens": 128,
        "do_sample": True,
        "return_full_text": False,
    },
}

test_src_dir = "./phi-inference-test"
os.makedirs(test_src_dir, exist_ok=True)
logger.info(f"Test script directory: {test_src_dir}")
sample_data_path = os.path.join(test_src_dir, "sample-request.json")

with open(sample_data_path, "w") as f:
    json.dump(sample, f)

In [None]:
result = ml_client.online_endpoints.invoke(
    endpoint_name=azure_endpoint_name,
    deployment_name=azure_deployment_name,
    request_file=sample_data_path,
)

result_json = json.loads(result)
print(result_json["result"])

### 4.2. LLM latency/throughput benchmarking


In [None]:
import numpy as np
from time import perf_counter


def benchmark_latency(
    endpoint_name, deployment_name, sample_data_path, num_warmups=1, num_infers=5
):
    print(
        f"Measuring latency for Endpoint '{endpoint_name}' and Deployment '{deployment_name}', num_infers={num_infers}"
    )

    latencies = []
    # warm up
    for _ in range(num_warmups):
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        )

    begin = time.time()
    # Timed run
    for _ in range(num_infers):
        start_time = perf_counter()
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        )
        latency = perf_counter() - start_time
        latencies.append(latency)
    end = time.time()

    # Compute run statistics
    duration = end - begin
    time_avg_sec = np.mean(latencies)
    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)

    # Metrics
    metrics = {
        "duration": duration,
        "avg_sec": time_avg_sec,
        "std_sec": time_std_sec,
        "p95_sec": time_p95_sec,
        "p99_sec": time_p99_sec,
    }

    return metrics


def benchmark_latency_multicore(
    endpoint_name,
    deployment_name,
    sample_data_path,
    num_warmups=1,
    num_infers=5,
    num_threads=2,
):
    import time
    import concurrent.futures

    # Warmup
    for _ in range(num_warmups):
        result = ml_client.online_endpoints.invoke(
            endpoint_name=endpoint_name,
            deployment_name=deployment_name,
            request_file=sample_data_path,
        )

    latencies = []

    # Thread task: Each of these thread tasks executes in a serial loop for a single model.
    #              Multiple of these threads are launched to achieve parallelism.
    def task(model):
        for _ in range(num_infers):
            start = time.time()
            result = ml_client.online_endpoints.invoke(
                endpoint_name=endpoint_name,
                deployment_name=deployment_name,
                request_file=sample_data_path,
            )
            finish = time.time()
            latencies.append(finish - start)

    # Submit tasks
    begin = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as pool:
        for i in range(num_threads):
            pool.submit(task, model)
    end = time.time()

    # Compute metrics
    duration = end - begin
    inferences = len(latencies)
    throughput = inferences / duration
    avg_latency = sum(latencies) / len(latencies)

    # Compute run statistics
    time_avg_sec = np.mean(latencies)
    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)

    time_std_sec = np.std(latencies)
    time_p95_sec = np.percentile(latencies, 95)
    time_p99_sec = np.percentile(latencies, 99)

    # Metrics
    metrics = {
        "threads": num_threads,
        "duration": duration,
        "throughput": throughput,
        "avg_sec": avg_latency,
        "std_sec": time_std_sec,
        "p95_sec": time_p95_sec,
        "p99_sec": time_p99_sec,
    }

    return metrics

In [None]:
benchmark_result = benchmark_latency(
    azure_endpoint_name,
    azure_deployment_name,
    sample_data_path,
    num_warmups=1,
    num_infers=5,
)

In [None]:
print(benchmark_result)

## Clean up


In [None]:
!rm -rf {test_src_dir}

In [None]:
ml_client.online_endpoints.begin_delete(azure_endpoint_name)