# Fine-tuning Open Source LLM using the Azure ML Python SDK (Custom Script)

### Overview

There are several cases where you might want to use custom scripts without MLflow in Azure ML.

#### 1. Compatibility with existing workflows

There are times when you don't want to use MLflow to maintain compatibility with existing workflows or toolchains. For example:

-   Customized logging solution: You already have a separate solution in place for logging and tracing (e.g. WANDB).
-   Requiring a specific format of data logging: If you need data logging in a specific format that is not MLflow's format.

#### 2. Need more granular control

You need more granular control over the training and inference process. MLflow provides a lot of convenience, but sometimes it makes it difficult to have granular control.

#### 3. Simple use cases

If your use case is simple enough that you don't need all of MLflow's features, you might be able to get by with basic AzureML functionality. If you're working on a toy project or a simple model training task and want to get by without complex tools, start with simple code.

#### 4. Security and compliance

You cannot use external tools because of specific security and compliance requirements.

-   Data security: You can't use external logging services or data stores due to specific data security requirements.
-   Regulatory compliance: When data must be stored in a specific format or location due to specific regulatory compliance requirements.

This notebook shows a basic example of training a model with a custom script.

[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.


## Load config file

---


In [None]:
%load_ext autoreload
%autoreload 2

import os, sys
lab_prep_dir = os.getcwd().split("slm-innovator-lab")[0] + "slm-innovator-lab/0_lab_preparation"
sys.path.append(os.path.abspath(lab_prep_dir))

from common import check_kernel
check_kernel()

In [None]:
import os
import yaml
from logger import logger
from datetime import datetime

snapshot_date = datetime.now().strftime("%Y-%m-%d")

with open("config.yml") as f:
    d = yaml.load(f, Loader=yaml.FullLoader)

AZURE_SUBSCRIPTION_ID = d["config"]["AZURE_SUBSCRIPTION_ID"]
AZURE_RESOURCE_GROUP = d["config"]["AZURE_RESOURCE_GROUP"]
AZURE_WORKSPACE = d["config"]["AZURE_WORKSPACE"]
AZURE_DATA_NAME = d["config"]["AZURE_DATA_NAME"]
DATA_DIR = d["config"]["DATA_DIR"]
CLOUD_DIR = d["config"]["CLOUD_DIR"]
HF_MODEL_NAME_OR_PATH = d["config"]["HF_MODEL_NAME_OR_PATH"]
IS_DEBUG = d["config"]["IS_DEBUG"]
USE_LOWPRIORITY_VM = d["config"]["USE_LOWPRIORITY_VM"]

azure_env_name = d["train"]["azure_env_name"]
azure_compute_cluster_name = d["train"]["azure_compute_cluster_name"]
azure_compute_cluster_size = d["train"]["azure_compute_cluster_size"]

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CLOUD_DIR, exist_ok=True)

logger.info("===== 0. Azure ML Training Info =====")
logger.info(f"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}")
logger.info(f"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}")
logger.info(f"AZURE_WORKSPACE={AZURE_WORKSPACE}")
logger.info(f"AZURE_DATA_NAME={AZURE_DATA_NAME}")
logger.info(f"DATA_DIR={DATA_DIR}")
logger.info(f"CLOUD_DIR={CLOUD_DIR}")
logger.info(f"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}")
logger.info(f"IS_DEBUG={IS_DEBUG}")
logger.info(f"USE_LOWPRIORITY_VM={USE_LOWPRIORITY_VM}")

logger.info(f"azure_env_name={azure_env_name}")
logger.info(f"azure_compute_cluster_name={azure_compute_cluster_name}")
logger.info(f"azure_compute_cluster_size={azure_compute_cluster_size}")

<br>

## 1. Dataset preparation

---

Preparing dataset is the first step in training a model. You can use the `datasets` library to load the dataset if you want to use Hugging Face datasets.<br>
Otherwise, you can use your own dataset from previous hands-on sessions.

We have prepared a dataset, [`lab1_augmented_samples.json`](lab1_augmented_samples.json), for this hands-on session.


In [None]:
USE_HF_DATASETS = False  # Determine if we use Hugging Face Datasets or not

import json
import random
from datasets import load_dataset
from random import randrange
from logger import logger

In [None]:
if not USE_HF_DATASETS:

    # Function to load data from the provided file and convert to JSONL format for single-turn conversations
    def load_and_convert_to_jsonl(
        file_path, system_prompt_msg="You're an AI assistant."
    ):
        with open(file_path, "r") as file:
            data = json.load(file)

        result = []

        for item in data:
            jsonl_entry = {
                "prompt": system_prompt_msg,
                "messages": [
                    {"content": item["input"], "role": "user"},
                    {"content": item["output"], "role": "assistant"},
                ],
            }
            result.append(json.dumps(jsonl_entry))

        return result

    def save_jsonl_data(jsonl_data, file_path):
        with open(file_path, "w") as file:
            for entry in jsonl_data:
                file.write(entry + "\n")

    # Function to split data into training and testing sets
    def split_train_test(jsonl_data, train_size=0.8):
        # Shuffle the data
        random.shuffle(jsonl_data)

        # Calculate split index
        split_index = int(len(jsonl_data) * train_size)

        # Split the data
        train_data = jsonl_data[:split_index]
        test_data = jsonl_data[split_index:]

        return train_data, test_data

    logger.info(f"===== 1. Custom Dataset preparation from Lab 1.  =====")
    logger.info(f"Preparing dataset.")
    file_path = "lab1_augmented_samples.json"
    system_prompt_msg = "You are the SME (Subject Matter Expert) in Distributed training on Cloud. Please answer the questions accurately."
    jsonl_dataset = load_and_convert_to_jsonl(file_path, system_prompt_msg)
    train_dataset, test_dataset = split_train_test(jsonl_dataset, train_size=0.8)
    logger.info(f"Save dataset to {DATA_DIR}")
    save_jsonl_data(train_dataset, f"{DATA_DIR}/train.jsonl")
    save_jsonl_data(test_dataset, f"{DATA_DIR}/eval.jsonl")

In [None]:
if USE_HF_DATASETS:
    logger.info(f"===== 1. Hugging Face Dataset preparation =====")
    logger.info(f"Loading dataset. It may take several minutes to load the dataset.")
    # Load dataset from the hub
    dataset = load_dataset("HuggingFaceH4/ultrachat_200k", split="train_sft[:2%]")

    print(f"Dataset size: {len(dataset)}")
    if IS_DEBUG:
        logger.info(
            f"Activated Debug mode. The number of sample was resampled to 1000."
        )
        dataset = dataset.select(range(1000))

    logger.info(f"Save dataset to {DATA_DIR}")
    dataset = dataset.train_test_split(test_size=0.2)
    train_dataset = dataset["train"]
    train_dataset.to_json(f"{DATA_DIR}/train.jsonl")
    test_dataset = dataset["test"]
    test_dataset.to_json(f"{DATA_DIR}/eval.jsonl")

<br>

## 2. Training preparation

---

### 2.1. Configure workspace details

To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.


In [None]:
# import required libraries
import time
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
from azure.ai.ml import command
from azure.ai.ml.entities import Data, Environment, BuildContext
from azure.ai.ml.entities import Model
from azure.ai.ml import Input
from azure.ai.ml import Output
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

logger.info(f"===== 2. Training preparation =====")
logger.info(f"Calling DefaultAzureCredential.")
credential = DefaultAzureCredential()
ml_client = MLClient(
    credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE
)

# The code below may conflict with AI Foundry as of February 2025.
# ml_client = None
# try:
#     ml_client = MLClient.from_config(credential)
# except Exception as ex:
#     print(ex)
#     ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)

### 2.2. Create AzureML environment and data

Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda).
This hands-on uses conda yaml.

Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data.


#### Conda environment


In [None]:
%%writefile {CLOUD_DIR}/train/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.10
  - pip=24.0
  - pip:
    - bitsandbytes==0.45.3
    - transformers==4.49.0
    - peft~=0.14.0
    - accelerate~=1.5.2
    - trl==0.15.2
    - einops==0.8.1
    - datasets==3.4.0
    - wandb==0.17.8
    - mlflow==2.16.0
    - azureml-mlflow==1.59.0
    - azureml-sdk==1.59.0
    - wandb==0.19.8
    - torch==2.6.0
    - torchaudio==2.6.0    
    - torchvision==0.21.0

#### Docker environment


In [None]:
%%writefile {CLOUD_DIR}/train/Dockerfile

FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2004-cu124-py310-torch241:biweekly.202503.1

USER root

# support Deepspeed launcher requirement of passwordless ssh login
RUN apt-get update && apt-get -y upgrade
RUN pip install --upgrade pip
RUN apt-get install -y openssh-server openssh-client

# Install pip dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

RUN MAX_JOBS=4 pip install flash-attn==2.7.4.post1 --no-build-isolation

In [None]:
%%writefile {CLOUD_DIR}/train/requirements.txt
azureml-acft-accelerator==0.0.70
azureml_acft_common_components==0.0.70
azureml-acft-contrib-hf-nlp==0.0.70
azureml-evaluate-mlflow==0.0.70
azureml-metrics[text]==0.0.70
mltable==1.6.1
mpi4py==4.0.3
sentencepiece==0.2.0
transformers==4.49.0
datasets==3.4.0
accelerate~=1.5.2
diffusers==0.32.2
onnxruntime==1.20.0
rouge-score==0.1.2
sacrebleu==2.5.1
bitsandbytes==0.45.3
einops==0.8.1
aiohttp==3.10.11
peft~=0.14.0
deepspeed==0.16.4
trl==0.15.2
tiktoken==0.9.0
packaging==24.2
timm==1.0.15
wandb==0.19.8
azure-identity

In [None]:
def get_or_create_environment_asset(
    ml_client, env_name, conda_yml="cloud/conda.yml", update=False
):

    try:
        latest_env_version = max(
            [int(e.version) for e in ml_client.environments.list(name=env_name)]
        )
        if update:
            raise ResourceExistsError(
                "Found Environment asset, but will update the Environment."
            )
        else:
            env_asset = ml_client.environments.get(
                name=env_name, version=latest_env_version
            )
            logger.info(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        print(f"Exception: {e}")
        env_docker_image = Environment(
            image="mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest",
            conda_file=conda_yml,
            name=env_name,
            description="Environment created for llm fine-tuning.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        logger.info(f"Created/Updated Environment asset: {env_name}")

    return env_asset


def get_or_create_docker_environment_asset(
    ml_client, env_name, docker_dir, update=False
):

    try:
        latest_env_version = max(
            [int(e.version) for e in ml_client.environments.list(name=env_name)]
        )
        if update:
            raise ResourceExistsError(
                "Found Environment asset, but will update the Environment."
            )
        else:
            env_asset = ml_client.environments.get(
                name=env_name, version=latest_env_version
            )
            logger.info(f"Found Environment asset: {env_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        logger.info(f"Exception: {e}")
        env_docker_image = Environment(
            build=BuildContext(path=docker_dir),
            name=env_name,
            description="Environment created from a Docker context.",
        )
        env_asset = ml_client.environments.create_or_update(env_docker_image)
        logger.info(f"Created Environment asset: {env_name}")

    return env_asset


def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):

    try:
        latest_data_version = max(
            [int(d.version) for d in ml_client.data.list(name=data_name)]
        )
        if update:
            raise ResourceExistsError("Found Data asset, but will update the Data.")
        else:
            data_asset = ml_client.data.get(name=data_name, version=latest_data_version)
            logger.info(f"Found Data asset: {data_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        data = Data(
            path=data_local_dir,
            type=AssetTypes.URI_FOLDER,
            description=f"{data_name} for fine tuning",
            tags={"FineTuningType": "Instruction", "Language": "En"},
            name=data_name,
        )
        data_asset = ml_client.data.create_or_update(data)
        logger.info(f"Created/Updated Data asset: {data_name}")

    return data_asset

In [None]:
# env = get_or_create_environment_asset(ml_client, azure_env_name, conda_yml=f"{CLOUD_DIR}/conda.yml", update=False)
env = get_or_create_docker_environment_asset(
    ml_client, azure_env_name, docker_dir=f"{CLOUD_DIR}/train", update=False
)
data = get_or_create_data_asset(
    ml_client, AZURE_DATA_NAME, data_local_dir=DATA_DIR, update=False
)

### 2.3. Training script


In [None]:
!pygmentize src_train/train.py

<br>

## 3. Training

---

### 3.1. Create the compute cluster


In [None]:
from azure.ai.ml.entities import AmlCompute

logger.info(f"===== 3. Training =====")
### Create the compute cluster
try:
    compute = ml_client.compute.get(azure_compute_cluster_name)
    logger.info("The compute cluster already exists! Reusing it for the current run")
except Exception as ex:
    logger.info(
        f"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!"
    )
    try:
        logger.info("Attempt #1 - Trying to create a dedicated compute")
        tier = "LowPriority" if USE_LOWPRIORITY_VM else "Dedicated"
        compute = AmlCompute(
            name=azure_compute_cluster_name,
            size=azure_compute_cluster_size,
            tier=tier,
            max_instances=1,  # For multi node training set this to an integer value more than 1
        )
        ml_client.compute.begin_create_or_update(compute).wait()
    except Exception as e:
        logger.info("Error")

### 3.2. Start training job

The `command` allows user to configure the following key aspects.

-   `inputs` - This is the dictionary of inputs using name value pairs to the command.
    -   `type` - The type of input. This can be a `uri_file` or `uri_folder`. The default is `uri_folder`.
    -   `path` - The path to the file or folder. These can be local or remote files or folders. For remote files - http/https, wasb are supported.
        -   Azure ML `data`/`dataset` or `datastore` are of type `uri_folder`. To use `data`/`dataset` as input, you can use registered dataset in the workspace using the format '<data_name>:<version>'. For e.g Input(type='uri_folder', path='my_dataset:1')
    -   `mode` - Mode of how the data should be delivered to the compute target. Allowed values are `ro_mount`, `rw_mount` and `download`. Default is `ro_mount`
-   `code` - This is the path where the code to run the command is located
-   `compute` - The compute on which the command will run. You can run it on the local machine by using `local` for the compute.
-   `command` - This is the command that needs to be run
    in the `command` using the `${{inputs.<input_name>}}` expression. To use files or folders as inputs, we can use the `Input` class. The `Input` class supports three parameters:
-   `environment` - This is the environment needed for the command to run. Curated (built-in) or custom environments from the workspace can be used.
-   `instance_count` - Number of nodes. Default is 1.
-   `distribution` - Distribution configuration for distributed training scenarios. Azure Machine Learning supports PyTorch, TensorFlow, and MPI-based distributed.


In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input
from azure.ai.ml.entities import ResourceConfiguration

USE_BUILTIN_ENV = False
str_command = ""

if USE_BUILTIN_ENV:
    str_env = "azureml://registries/azureml/environments/acft-hf-nlp-gpu/versions/86"  # Use built-in Environment asset
    str_command += "pip install -r requirements.txt && "
else:
    str_env = f"{azure_env_name}@latest"  # Use Curated (built-in) Environment asset

str_command += "python train.py --train_dir ${{inputs.train_dir}} \
            --epochs ${{inputs.epoch}} --train_batch_size ${{inputs.train_batch_size}} \
            --eval_batch_size ${{inputs.eval_batch_size}} --model_dir ${{inputs.model_dir}}"

logger.info(f"Env: {str_env}")
logger.info(f"Command: {str_command}")

job = command(
    inputs=dict(
        # train_dir=Input(type="uri_folder", path=DATA_DIR), # Get data from local path
        train_dir=Input(path=f"{AZURE_DATA_NAME}@latest"),  # Get data from Data asset
        epoch=d["train"]["epoch"],
        train_batch_size=d["train"]["train_batch_size"],
        eval_batch_size=d["train"]["eval_batch_size"],
        model_dir=d["train"]["model_dir"],
    ),
    code="./src_train",  # local path where the code is stored
    compute=azure_compute_cluster_name,
    command=str_command,
    environment=str_env,
    distribution={
        "type": "PyTorch",
        "process_count_per_instance": 1,  # For multi-gpu training set this to an integer value more than 1
    },
)

returned_job = ml_client.jobs.create_or_update(job)
logger.info(
    """Started training job. Now a dedicated Compute Cluster for training is provisioned and the environment
required for training is automatically set up from Environment.

If you have set up a new custom Environment, it will take approximately 20 minutes or more to set up the Environment before provisioning the training cluster.
"""
)
ml_client.jobs.stream(returned_job.name)

In [None]:
display(returned_job)

In [None]:
# check if the `trained_model` output is available
job_name = returned_job.name

In [None]:
%store job_name

<br>

## 4. (Optional) Create model asset and get fine-tuned LLM to local folder

---

### 4.1. Create model asset


In [None]:
def get_or_create_model_asset(
    ml_client,
    model_name,
    job_name,
    model_dir="outputs",
    model_type="custom_model",
    update=False,
):

    try:
        latest_model_version = max(
            [int(m.version) for m in ml_client.models.list(name=model_name)]
        )
        if update:
            raise ResourceExistsError("Found Model asset, but will update the Model.")
        else:
            model_asset = ml_client.models.get(
                name=model_name, version=latest_model_version
            )
            logger.info(f"Found Model asset: {model_name}. Will not create again")
    except (ResourceNotFoundError, ResourceExistsError) as e:
        logger.info(f"Exception: {e}")
        model_path = f"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/"
        run_model = Model(
            name=model_name,
            path=model_path,
            description="Model created from run.",
            type=model_type,  # mlflow_model, custom_model, triton_model
        )
        model_asset = ml_client.models.create_or_update(run_model)
        logger.info(f"Created Model asset: {model_name}")

    return model_asset

In [None]:
azure_model_name = d["serve"]["azure_model_name"]
model_dir = d["train"]["model_dir"]
model = get_or_create_model_asset(
    ml_client,
    azure_model_name,
    job_name,
    model_dir,
    model_type="custom_model",
    update=False,
)

logger.info(
    "===== 4. (Optional) Create model asset and get fine-tuned LLM to local folder ====="
)
logger.info(f"azure_model_name={azure_model_name}")
logger.info(f"model_dir={model_dir}")
logger.info(f"model={model}")

### 4.2. Get fine-tuned LLM to local folder

You can copy it to your local directory to perform inference or serve the model in Azure environment. (e.g., real-time endpoint)


In [None]:
# # Download the model (this is optional)
local_model_dir = "./artifact_downloads"
os.makedirs(local_model_dir, exist_ok=True)

ml_client.models.download(
    name=azure_model_name, download_path=local_model_dir, version=model.version
)

## Clean up


In [None]:
!rm -rf $DATA_DIR {local_model_dir}