phi3/1_training_mlflow_phi3.ipynb (866 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fine-tuning Open Source LLM using the Azure ML Python SDK (MLflow)\n",
"\n",
"### Overview\n",
"\n",
"Azure ML Workspace is compatible with MLflow and can be used as an MLflow Tracking Server, as described in the following official guide from Microsoft. MLflow provides features such as experiment tracking, model management, and model deployment, allowing you to manage data science and machine learning workflows more efficiently and systematically. Below are the main advantages of using Azure ML and MLflow together.\n",
"\n",
"#### 1. Experiment tracking and management\n",
"\n",
"You can systematically manage the parameters, metrics, and artifacts of all your experiments. Integrating with Azur eML allows you to easily track and manage this information within your Azure ML workspace.\n",
"\n",
"#### 2. Model management\n",
"\n",
"MLflow provides a model registry for model versioning. Integrate with AzureML to systematically manage and deploy all versions of your models. When combined with AzureML's deployment capabilities, models can be easily deployed to a variety of environments (e.g. Azure Kubernetes Service, Azure Container Instances).\n",
"\n",
"#### 3. Reproducibility and collaboration\n",
"\n",
"MLflow records the parameters and environment of every experiment, so you can accurately reproduce the experiment. This is very useful when you need to redo the same experiment across collaborating team members, or when you need to rerun an experiment at a later date.\n",
"\n",
"#### 4. CI/CD integration\n",
"\n",
"MLflow makes it easy to implement continuous integration (CI) and continuous deployment (CD) of machine learning models. Integrate with Azure DevOps or GitHub Actions to automatically run training, validation, and deployment processes as model changes occur.\n",
"\n",
"When training a model with Hugging Face's Trainer API, if you specify `report_to=\"azure_ml\"`, basic indicators will be automatically logged without any additional code. Of course, you can freely log custom indicators using Bring Your Own Script like the conventional method, but Azure ML's basic logging function is also excellent, so try using it as a baseline.\n",
"\n",
"[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load config file\n",
"\n",
"---\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"\n",
"import os, sys\n",
"lab_prep_dir = os.getcwd().split(\"slm-innovator-lab\")[0] + \"slm-innovator-lab/0_lab_preparation\"\n",
"sys.path.append(os.path.abspath(lab_prep_dir))\n",
"\n",
"from common import check_kernel\n",
"check_kernel()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import os\n",
"import yaml\n",
"from logger import logger\n",
"from datetime import datetime\n",
"\n",
"snapshot_date = datetime.now().strftime(\"%Y-%m-%d\")\n",
"\n",
"with open(\"config.yml\") as f:\n",
" d = yaml.load(f, Loader=yaml.FullLoader)\n",
"\n",
"AZURE_SUBSCRIPTION_ID = d[\"config\"][\"AZURE_SUBSCRIPTION_ID\"]\n",
"AZURE_RESOURCE_GROUP = d[\"config\"][\"AZURE_RESOURCE_GROUP\"]\n",
"AZURE_WORKSPACE = d[\"config\"][\"AZURE_WORKSPACE\"]\n",
"AZURE_DATA_NAME = d[\"config\"][\"AZURE_DATA_NAME\"]\n",
"DATA_DIR = d[\"config\"][\"DATA_DIR\"]\n",
"CLOUD_DIR = d[\"config\"][\"CLOUD_DIR\"]\n",
"HF_MODEL_NAME_OR_PATH = d[\"config\"][\"HF_MODEL_NAME_OR_PATH\"]\n",
"IS_DEBUG = d[\"config\"][\"IS_DEBUG\"]\n",
"USE_LOWPRIORITY_VM = d[\"config\"][\"USE_LOWPRIORITY_VM\"]\n",
"\n",
"azure_env_name = d[\"train\"][\"azure_env_name\"]\n",
"azure_compute_cluster_name = d[\"train\"][\"azure_compute_cluster_name\"]\n",
"azure_compute_cluster_size = d[\"train\"][\"azure_compute_cluster_size\"]\n",
"\n",
"os.makedirs(DATA_DIR, exist_ok=True)\n",
"os.makedirs(CLOUD_DIR, exist_ok=True)\n",
"\n",
"logger.info(\"===== 0. Azure ML Training Info =====\")\n",
"logger.info(f\"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}\")\n",
"logger.info(f\"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}\")\n",
"logger.info(f\"AZURE_WORKSPACE={AZURE_WORKSPACE}\")\n",
"logger.info(f\"AZURE_DATA_NAME={AZURE_DATA_NAME}\")\n",
"logger.info(f\"DATA_DIR={DATA_DIR}\")\n",
"logger.info(f\"CLOUD_DIR={CLOUD_DIR}\")\n",
"logger.info(f\"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}\")\n",
"logger.info(f\"IS_DEBUG={IS_DEBUG}\")\n",
"logger.info(f\"USE_LOWPRIORITY_VM={USE_LOWPRIORITY_VM}\")\n",
"\n",
"logger.info(f\"azure_env_name={azure_env_name}\")\n",
"logger.info(f\"azure_compute_cluster_name={azure_compute_cluster_name}\")\n",
"logger.info(f\"azure_compute_cluster_size={azure_compute_cluster_size}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 1. Dataset preparation\n",
"\n",
"---\n",
"\n",
"Preparing dataset is the first step in training a model. You can use the `datasets` library to load the dataset if you want to use Hugging Face datasets.<br>\n",
"Otherwise, you can use your own dataset from previous hands-on sessions.\n",
"\n",
"We have prepared a dataset, [`lab1_augmented_samples.json`](lab1_augmented_samples.json), for this hands-on session.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"USE_HF_DATASETS = False # Determine if we use Hugging Face Datasets or not\n",
"\n",
"import json\n",
"import random\n",
"from datasets import load_dataset\n",
"from random import randrange\n",
"from logger import logger"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if not USE_HF_DATASETS:\n",
"\n",
" # Function to load data from the provided file and convert to JSONL format for single-turn conversations\n",
" def load_and_convert_to_jsonl(\n",
" file_path, system_prompt_msg=\"You're an AI assistant.\"\n",
" ):\n",
" with open(file_path, \"r\") as file:\n",
" data = json.load(file)\n",
"\n",
" result = []\n",
"\n",
" for item in data:\n",
" jsonl_entry = {\n",
" \"prompt\": system_prompt_msg,\n",
" \"messages\": [\n",
" {\"content\": item[\"input\"], \"role\": \"user\"},\n",
" {\"content\": item[\"output\"], \"role\": \"assistant\"},\n",
" ],\n",
" }\n",
" result.append(json.dumps(jsonl_entry))\n",
"\n",
" return result\n",
"\n",
" def save_jsonl_data(jsonl_data, file_path):\n",
" with open(file_path, \"w\") as file:\n",
" for entry in jsonl_data:\n",
" file.write(entry + \"\\n\")\n",
"\n",
" # Function to split data into training and testing sets\n",
" def split_train_test(jsonl_data, train_size=0.8):\n",
" # Shuffle the data\n",
" random.shuffle(jsonl_data)\n",
"\n",
" # Calculate split index\n",
" split_index = int(len(jsonl_data) * train_size)\n",
"\n",
" # Split the data\n",
" train_data = jsonl_data[:split_index]\n",
" test_data = jsonl_data[split_index:]\n",
"\n",
" return train_data, test_data\n",
"\n",
" logger.info(f\"===== 1. Custom Dataset preparation from Lab 1. =====\")\n",
" logger.info(f\"Preparing dataset.\")\n",
" file_path = \"lab1_augmented_samples.json\"\n",
" system_prompt_msg = \"You are the SME (Subject Matter Expert) in Distributed training on Cloud. Please answer the questions accurately.\"\n",
" jsonl_dataset = load_and_convert_to_jsonl(file_path, system_prompt_msg)\n",
" train_dataset, test_dataset = split_train_test(jsonl_dataset, train_size=0.8)\n",
" logger.info(f\"Save dataset to {DATA_DIR}\")\n",
" save_jsonl_data(train_dataset, f\"{DATA_DIR}/train.jsonl\")\n",
" save_jsonl_data(test_dataset, f\"{DATA_DIR}/eval.jsonl\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if USE_HF_DATASETS:\n",
" logger.info(f\"===== 1. Hugging Face Dataset preparation =====\")\n",
" logger.info(f\"Loading dataset. It may take several minutes to load the dataset.\")\n",
" # Load dataset from the hub\n",
" dataset = load_dataset(\"HuggingFaceH4/ultrachat_200k\", split=\"train_sft[:2%]\")\n",
"\n",
" print(f\"Dataset size: {len(dataset)}\")\n",
" if IS_DEBUG:\n",
" logger.info(\n",
" f\"Activated Debug mode. The number of sample was resampled to 1000.\"\n",
" )\n",
" dataset = dataset.select(range(1000))\n",
"\n",
" logger.info(f\"Save dataset to {DATA_DIR}\")\n",
" dataset = dataset.train_test_split(test_size=0.2)\n",
" train_dataset = dataset[\"train\"]\n",
" train_dataset.to_json(f\"{DATA_DIR}/train.jsonl\")\n",
" test_dataset = dataset[\"test\"]\n",
" test_dataset.to_json(f\"{DATA_DIR}/eval.jsonl\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 2. Training preparation\n",
"\n",
"---\n",
"\n",
"### 2.1. Configure workspace details\n",
"\n",
"To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# import required libraries\n",
"import time\n",
"from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n",
"from azure.ai.ml import MLClient, Input\n",
"from azure.ai.ml.dsl import pipeline\n",
"from azure.ai.ml import load_component\n",
"from azure.ai.ml import command\n",
"from azure.ai.ml.entities import Data, Environment, BuildContext\n",
"from azure.ai.ml.entities import Model\n",
"from azure.ai.ml import Input\n",
"from azure.ai.ml import Output\n",
"from azure.ai.ml.constants import AssetTypes\n",
"from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError\n",
"\n",
"logger.info(f\"===== 2. Training preparation =====\")\n",
"logger.info(f\"Calling DefaultAzureCredential.\")\n",
"credential = DefaultAzureCredential()\n",
"ml_client = MLClient(\n",
" credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE\n",
")\n",
"\n",
"# The code below may conflict with AI Foundry as of February 2025.\n",
"# ml_client = None\n",
"# try:\n",
"# ml_client = MLClient.from_config(credential)\n",
"# except Exception as ex:\n",
"# print(ex)\n",
"# ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### 2.2. Create AzureML environment and data\n",
"\n",
"Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda).\n",
"This hands-on uses conda yaml.\n",
"\n",
"Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"#### Conda environment\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile {CLOUD_DIR}/train/conda.yml\n",
"name: model-env\n",
"channels:\n",
" - conda-forge\n",
"dependencies:\n",
" - python=3.10\n",
" - pip=24.0\n",
" - pip:\n",
" - bitsandbytes==0.43.3\n",
" - transformers==4.44.2\n",
" - peft~=0.12\n",
" - accelerate~=0.33\n",
" - trl==0.10.1\n",
" - einops==0.8.0\n",
" - datasets==2.21.0\n",
" - wandb==0.17.8\n",
" - mlflow==2.16.0\n",
" - azureml-mlflow==1.57.0\n",
" - azureml-sdk==1.57.0\n",
" - torchvision==0.19.0\n",
" - torch==2.4.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Docker environment\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%writefile {CLOUD_DIR}/train/Dockerfile\n",
"FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2004-cu124-py310-torch241:biweekly.202410.2\n",
"\n",
"USER root\n",
"\n",
"# support Deepspeed launcher requirement of passwordless ssh login\n",
"RUN apt-get update && apt-get -y upgrade\n",
"RUN pip install --upgrade pip\n",
"RUN apt-get install -y openssh-server openssh-client\n",
"\n",
"# Install pip dependencies\n",
"COPY requirements.txt .\n",
"RUN pip install -r requirements.txt --no-cache-dir\n",
"\n",
"RUN MAX_JOBS=4 pip install flash-attn==2.6.3 --no-build-isolation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%writefile {CLOUD_DIR}/train/requirements.txt\n",
"azureml-acft-accelerator==0.0.63\n",
"azureml_acft_common_components==0.0.63\n",
"azureml-acft-contrib-hf-nlp==0.0.63\n",
"azureml-evaluate-mlflow==0.0.63\n",
"azureml-metrics[text]==0.0.63\n",
"mltable==1.6.1\n",
"mpi4py==4.0.1\n",
"sentencepiece==0.2.0\n",
"transformers==4.46.1\n",
"datasets==3.1.0\n",
"accelerate==1.1.0\n",
"diffusers==0.31.0\n",
"onnxruntime==1.20.0\n",
"rouge-score==0.1.2\n",
"sacrebleu==2.4.3\n",
"bitsandbytes==0.44.1\n",
"einops==0.8.0\n",
"aiohttp==3.10.10\n",
"peft==0.13.2\n",
"deepspeed==0.15.3\n",
"trl==0.12.0\n",
"tiktoken==0.8.0\n",
"packaging==24.1\n",
"timm==1.0.11\n",
"azure-identity"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def get_or_create_environment_asset(\n",
" ml_client, env_name, conda_yml=\"cloud/conda.yml\", update=False\n",
"):\n",
"\n",
" try:\n",
" latest_env_version = max(\n",
" [int(e.version) for e in ml_client.environments.list(name=env_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\n",
" \"Found Environment asset, but will update the Environment.\"\n",
" )\n",
" else:\n",
" env_asset = ml_client.environments.get(\n",
" name=env_name, version=latest_env_version\n",
" )\n",
" logger.info(f\"Found Environment asset: {env_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" print(f\"Exception: {e}\")\n",
" env_docker_image = Environment(\n",
" image=\"mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest\",\n",
" conda_file=conda_yml,\n",
" name=env_name,\n",
" description=\"Environment created for llm fine-tuning.\",\n",
" )\n",
" env_asset = ml_client.environments.create_or_update(env_docker_image)\n",
" logger.info(f\"Created/Updated Environment asset: {env_name}\")\n",
"\n",
" return env_asset\n",
"\n",
"\n",
"def get_or_create_docker_environment_asset(\n",
" ml_client, env_name, docker_dir, update=False\n",
"):\n",
"\n",
" try:\n",
" latest_env_version = max(\n",
" [int(e.version) for e in ml_client.environments.list(name=env_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\n",
" \"Found Environment asset, but will update the Environment.\"\n",
" )\n",
" else:\n",
" env_asset = ml_client.environments.get(\n",
" name=env_name, version=latest_env_version\n",
" )\n",
" logger.info(f\"Found Environment asset: {env_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" logger.info(f\"Exception: {e}\")\n",
" env_docker_image = Environment(\n",
" build=BuildContext(path=docker_dir),\n",
" name=env_name,\n",
" description=\"Environment created from a Docker context.\",\n",
" )\n",
" env_asset = ml_client.environments.create_or_update(env_docker_image)\n",
" logger.info(f\"Created Environment asset: {env_name}\")\n",
"\n",
" return env_asset\n",
"\n",
"\n",
"def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):\n",
"\n",
" try:\n",
" latest_data_version = max(\n",
" [int(d.version) for d in ml_client.data.list(name=data_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\"Found Data asset, but will update the Data.\")\n",
" else:\n",
" data_asset = ml_client.data.get(name=data_name, version=latest_data_version)\n",
" logger.info(f\"Found Data asset: {data_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" data = Data(\n",
" path=data_local_dir,\n",
" type=AssetTypes.URI_FOLDER,\n",
" description=f\"{data_name} for fine tuning\",\n",
" tags={\"FineTuningType\": \"Instruction\", \"Language\": \"En\"},\n",
" name=data_name,\n",
" )\n",
" data_asset = ml_client.data.create_or_update(data)\n",
" logger.info(f\"Created/Updated Data asset: {data_name}\")\n",
"\n",
" return data_asset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# env = get_or_create_environment_asset(ml_client, azure_env_name, conda_yml=f\"{CLOUD_DIR}/train/conda.yml\", update=False)\n",
"env = get_or_create_docker_environment_asset(\n",
" ml_client, azure_env_name, docker_dir=f\"{CLOUD_DIR}/train\", update=False\n",
")\n",
"data = get_or_create_data_asset(\n",
" ml_client, AZURE_DATA_NAME, data_local_dir=DATA_DIR, update=False\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.3. Training script\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# !pygmentize src_train/train_mlflow.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 3. Training\n",
"\n",
"---\n",
"\n",
"### 3.1. Create the compute cluster\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import AmlCompute\n",
"\n",
"logger.info(f\"===== 3. Training =====\")\n",
"### Create the compute cluster\n",
"try:\n",
" compute = ml_client.compute.get(azure_compute_cluster_name)\n",
" logger.info(\"The compute cluster already exists! Reusing it for the current run\")\n",
"except Exception as ex:\n",
" logger.info(\n",
" f\"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!\"\n",
" )\n",
" try:\n",
" print(\"Attempt #1 - Trying to create a dedicated compute\")\n",
" tier = \"LowPriority\" if USE_LOWPRIORITY_VM else \"Dedicated\"\n",
" compute = AmlCompute(\n",
" name=azure_compute_cluster_name,\n",
" size=azure_compute_cluster_size,\n",
" tier=tier,\n",
" max_instances=1, # For multi node training set this to an integer value more than 1\n",
" )\n",
" ml_client.compute.begin_create_or_update(compute).wait()\n",
" except Exception as e:\n",
" logger.info(\"Error\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3.2. Start training job\n",
"\n",
"The `command` allows user to configure the following key aspects.\n",
"\n",
"- `inputs` - This is the dictionary of inputs using name value pairs to the command.\n",
" - `type` - The type of input. This can be a `uri_file` or `uri_folder`. The default is `uri_folder`.\n",
" - `path` - The path to the file or folder. These can be local or remote files or folders. For remote files - http/https, wasb are supported.\n",
" - Azure ML `data`/`dataset` or `datastore` are of type `uri_folder`. To use `data`/`dataset` as input, you can use registered dataset in the workspace using the format '<data_name>:<version>'. For e.g Input(type='uri_folder', path='my_dataset:1')\n",
" - `mode` - Mode of how the data should be delivered to the compute target. Allowed values are `ro_mount`, `rw_mount` and `download`. Default is `ro_mount`\n",
"- `code` - This is the path where the code to run the command is located\n",
"- `compute` - The compute on which the command will run. You can run it on the local machine by using `local` for the compute.\n",
"- `command` - This is the command that needs to be run\n",
" in the `command` using the `${{inputs.<input_name>}}` expression. To use files or folders as inputs, we can use the `Input` class. The `Input` class supports three parameters:\n",
"- `environment` - This is the environment needed for the command to run. Curated (built-in) or custom environments from the workspace can be used.\n",
"- `instance_count` - Number of nodes. Default is 1.\n",
"- `distribution` - Distribution configuration for distributed training scenarios. Azure Machine Learning supports PyTorch, TensorFlow, and MPI-based distributed.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from azure.ai.ml import command\n",
"from azure.ai.ml import Input\n",
"from azure.ai.ml.entities import ResourceConfiguration\n",
"\n",
"USE_BUILTIN_ENV = False\n",
"str_command = \"\"\n",
"\n",
"if USE_BUILTIN_ENV:\n",
" str_env = \"azureml://registries/azureml/environments/acft-hf-nlp-gpu/versions/77\" # Use built-in Environment asset\n",
" str_command += \"pip install -r requirements.txt && \"\n",
"else:\n",
" str_env = f\"{azure_env_name}@latest\" # Use Curated (built-in) Environment asset\n",
"\n",
"str_command += \"python train_mlflow.py \\\n",
" --model_name_or_path ${{inputs.model_name_or_path}} \\\n",
" --train_dir ${{inputs.train_dir}} \\\n",
" --epochs ${{inputs.epoch}} \\\n",
" --train_batch_size ${{inputs.train_batch_size}} \\\n",
" --eval_batch_size ${{inputs.eval_batch_size}} \\\n",
" --model_dir ${{inputs.model_dir}}\"\n",
"\n",
"logger.info(f\"Env: {str_env}\")\n",
"logger.info(f\"Command: {str_command}\")\n",
"\n",
"job = command(\n",
" inputs=dict(\n",
" model_name_or_path=HF_MODEL_NAME_OR_PATH,\n",
" # train_dir=Input(type=\"uri_folder\", path=DATA_DIR), # Get data from local path\n",
" train_dir=Input(path=f\"{AZURE_DATA_NAME}@latest\"), # Get data from Data asset\n",
" epoch=d[\"train\"][\"epoch\"],\n",
" train_batch_size=d[\"train\"][\"train_batch_size\"],\n",
" eval_batch_size=d[\"train\"][\"eval_batch_size\"],\n",
" model_dir=d[\"train\"][\"model_dir\"],\n",
" ),\n",
" code=\"./src_train\", # local path where the code is stored\n",
" compute=azure_compute_cluster_name,\n",
" command=str_command,\n",
" environment=str_env,\n",
" distribution={\n",
" \"type\": \"PyTorch\",\n",
" \"process_count_per_instance\": 1, # For multi-gpu training set this to an integer value more than 1\n",
" },\n",
")\n",
"returned_job = ml_client.jobs.create_or_update(job)\n",
"logger.info(\n",
" \"\"\"Started training job. Now a dedicated Compute Cluster for training is provisioned and the environment\n",
"required for training is automatically set up from Environment.\n",
"\n",
"If you have set up a new custom Environment, it will take approximately 20 minutes or more to set up the Environment before provisioning the training cluster.\n",
"\"\"\"\n",
")\n",
"ml_client.jobs.stream(returned_job.name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"display(returned_job)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# check if the `trained_model` output is available\n",
"job_name = returned_job.name"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%store job_name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 4. (Optional) Create model asset and get fine-tuned LLM to local folder\n",
"\n",
"---\n",
"\n",
"### 4.1. Create model asset\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def get_or_create_model_asset(\n",
" ml_client,\n",
" model_name,\n",
" job_name,\n",
" model_dir=\"outputs\",\n",
" model_type=\"custom_model\",\n",
" update=False,\n",
"):\n",
"\n",
" try:\n",
" latest_model_version = max(\n",
" [int(m.version) for m in ml_client.models.list(name=model_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\"Found Model asset, but will update the Model.\")\n",
" else:\n",
" model_asset = ml_client.models.get(\n",
" name=model_name, version=latest_model_version\n",
" )\n",
" logger.info(f\"Found Model asset: {model_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" logger.info(f\"Exception: {e}\")\n",
" model_path = f\"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/\"\n",
" run_model = Model(\n",
" name=model_name,\n",
" path=model_path,\n",
" description=\"Model created from run.\",\n",
" type=model_type, # mlflow_model, custom_model, triton_model\n",
" )\n",
" model_asset = ml_client.models.create_or_update(run_model)\n",
" logger.info(f\"Created Model asset: {model_name}\")\n",
"\n",
" return model_asset"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that `model_type=\"custom_model` is intentional. This is because for newer models, MLflow's auto-logging compatibility is not as good and models need to be saved the traditional way.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"azure_model_name = d[\"serve\"][\"azure_model_name\"]\n",
"model_dir = d[\"train\"][\"model_dir\"]\n",
"model = get_or_create_model_asset(\n",
" ml_client,\n",
" azure_model_name,\n",
" job_name,\n",
" model_dir,\n",
" model_type=\"custom_model\",\n",
" update=False,\n",
")\n",
"\n",
"logger.info(\n",
" \"===== 4. (Optional) Create model asset and get fine-tuned LLM to local folder =====\"\n",
")\n",
"logger.info(f\"azure_model_name={azure_model_name}\")\n",
"logger.info(f\"model_dir={model_dir}\")\n",
"logger.info(f\"model={model}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.2. Get fine-tuned LLM to local folder\n",
"\n",
"You can copy it to your local directory to perform inference or serve the model in Azure environment. (e.g., real-time endpoint)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Download the model (this is optional)\n",
"local_model_dir = \"./artifact_downloads\"\n",
"os.makedirs(local_model_dir, exist_ok=True)\n",
"\n",
"ml_client.models.download(\n",
" name=azure_model_name, download_path=local_model_dir, version=model.version\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!rm -rf $DATA_DIR {local_model_dir}"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10 - SDK v2",
"language": "python",
"name": "python310-sdkv2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
},
"microsoft": {
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}