training_dpo.ipynb (555 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "# Fine-tune/Evaluate/Quantize Open Source SLM/LLM using the torchtune on Azure ML\n", "\n", "## Direct Preference Optimization (DPO)\n", "\n", "> For those new to DPO: DPO is a technique that directly fine-tunes models based on user preferences, offering an alternative to RLHF (Reinforcement Learning with Human Feedback). While RLHF involves training a reward model and optimizing the policy using reinforcement learning, DPO eliminates the need for a separate reward model, simplifying the alignment process. <p> DPO’s main advantage over RLHF lies in its efficiency and reduced complexity. By directly optimizing the model with preference data, DPO avoids the computational overhead and potential instability associated with reinforcement learning, such as reward hacking or policy divergence. Furthermore, DPO can be more interpretable and easier to implement, as it directly aligns model behavior with user-provided rankings or comparisons without requiring an intermediate reward signal.\n", "\n", "[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load config file\n", "\n", "---\n", "\n", "Load the config file to set up the workspace and other configurations." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2\n", "import os, sys\n", "from utils.aml_common import (\n", " check_kernel, \n", " get_or_create_environment_asset, \n", " get_or_create_docker_environment_asset, \n", " get_or_create_data_asset\n", ")\n", "\n", "check_kernel()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import yaml\n", "from utils.logger import logger\n", "from datetime import datetime\n", "\n", "snapshot_date = datetime.now().strftime(\"%Y-%m-%d\")\n", "\n", "with open(\"config.yml\") as f:\n", " d = yaml.load(f, Loader=yaml.FullLoader)\n", "\n", "AZURE_SUBSCRIPTION_ID = d[\"config\"][\"AZURE_SUBSCRIPTION_ID\"]\n", "AZURE_RESOURCE_GROUP = d[\"config\"][\"AZURE_RESOURCE_GROUP\"]\n", "AZURE_WORKSPACE = d[\"config\"][\"AZURE_WORKSPACE\"]\n", "AZURE_SFT_DATA_NAME = d[\"config\"][\"AZURE_SFT_DATA_NAME\"]\n", "AZURE_DPO_DATA_NAME = d[\"config\"][\"AZURE_DPO_DATA_NAME\"]\n", "SFT_DATA_DIR = d[\"config\"][\"SFT_DATA_DIR\"]\n", "DPO_DATA_DIR = d[\"config\"][\"DPO_DATA_DIR\"]\n", "CLOUD_DIR = d[\"config\"][\"CLOUD_DIR\"]\n", "HF_MODEL_NAME_OR_PATH = d[\"config\"][\"HF_MODEL_NAME_OR_PATH\"]\n", "HF_TOKEN = d[\"config\"][\"HF_TOKEN\"]\n", "IS_DEBUG = d[\"config\"][\"IS_DEBUG\"]\n", "USE_LOWPRIORITY_VM = d[\"config\"][\"USE_LOWPRIORITY_VM\"]\n", "USE_BUILTIN_ENV = False\n", "\n", "azure_env_name = d[\"train\"][\"azure_env_name\"]\n", "azure_compute_cluster_name = d[\"train\"][\"azure_compute_cluster_name\"]\n", "azure_compute_cluster_size = d[\"train\"][\"azure_compute_cluster_size\"]\n", "\n", "wandb_api_key = d[\"train\"][\"wandb_api_key\"]\n", "wandb_project = d[\"train\"][\"wandb_project\"]\n", "wandb_watch = d[\"train\"][\"wandb_watch\"]\n", "\n", "os.makedirs(SFT_DATA_DIR, exist_ok=True)\n", "os.makedirs(DPO_DATA_DIR, exist_ok=True)\n", "os.makedirs(CLOUD_DIR, exist_ok=True)\n", "\n", "logger.info(\"===== 0. Azure ML Training Info =====\")\n", "logger.info(f\"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}\")\n", "logger.info(f\"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}\")\n", "logger.info(f\"AZURE_WORKSPACE={AZURE_WORKSPACE}\")\n", "logger.info(f\"AZURE_SFT_DATA_NAME={AZURE_SFT_DATA_NAME}\")\n", "logger.info(f\"AZURE_DPO_DATA_NAME={AZURE_DPO_DATA_NAME}\")\n", "logger.info(f\"SFT_DATA_DIR={SFT_DATA_DIR}\")\n", "logger.info(f\"DPO_DATA_DIR={DPO_DATA_DIR}\")\n", "logger.info(f\"CLOUD_DIR={CLOUD_DIR}\")\n", "logger.info(f\"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}\")\n", "logger.info(f\"HF_TOKEN={HF_TOKEN}\")\n", "logger.info(f\"IS_DEBUG={IS_DEBUG}\")\n", "logger.info(f\"USE_LOWPRIORITY_VM={USE_LOWPRIORITY_VM}\")\n", "logger.info(f\"USE_BUILTIN_ENV={USE_BUILTIN_ENV}\")\n", "\n", "logger.info(f\"azure_env_name={azure_env_name}\")\n", "logger.info(f\"azure_compute_cluster_name={azure_compute_cluster_name}\")\n", "logger.info(f\"azure_compute_cluster_size={azure_compute_cluster_size}\")\n", "logger.info(f\"wandb_api_key={wandb_api_key}\")\n", "logger.info(f\"wandb_project={wandb_project}\")\n", "logger.info(f\"wandb_watch={wandb_watch}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<br>\n", "\n", "## 1. Dataset preparation\n", "\n", "---\n", "\n", "For this hands-on, we utilize Hugging Face dataset. But if you would like to build/augment your own dataset, please refer to https://github.com/Azure/synthetic-qa-generation\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import json\n", "from datasets import load_dataset\n", "from random import randrange\n", "from utils.dataset_common import convert_to_preference_format\n", "\n", "logger.info(f\"===== 1. Dataset preparation =====\")\n", "logger.info(f\"Loading dataset. It may take several minutes to load the dataset.\")\n", "\n", "# Load dataset from the hub\n", "data_path = \"jondurbin/truthy-dpo-v0.1\"\n", "dataset = load_dataset(data_path, split=\"train\")\n", "\n", "print(f\"Dataset size: {len(dataset)}\")\n", "# if IS_DEBUG:\n", "# logger.info(f\"Activated Debug mode. The number of sample was resampled to 1000.\")\n", "# dataset = dataset.select(range(800))\n", "\n", "logger.info(f\"Save dataset to {DPO_DATA_DIR}\")\n", "dataset = dataset.train_test_split(test_size=0.2)\n", "train_dataset = dataset[\"train\"]\n", "test_dataset = dataset[\"test\"]\n", "\n", "train_dataset = convert_to_preference_format(train_dataset)\n", "test_dataset = convert_to_preference_format(test_dataset)\n", "\n", "with open(f\"{DPO_DATA_DIR}/train.jsonl\", \"w\") as f:\n", " json.dump(train_dataset, f, ensure_ascii=False, indent=4)\n", "\n", "with open(f\"{DPO_DATA_DIR}/eval.jsonl\", \"w\") as f:\n", " json.dump(test_dataset, f, ensure_ascii=False, indent=4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<br>\n", "\n", "## 2. Training preparation\n", "\n", "---\n", "\n", "### 2.1. Configure workspace details\n", "\n", "To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import time\n", "from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n", "from azure.ai.ml import MLClient, Input\n", "\n", "logger.info(f\"===== 2. Training preparation =====\")\n", "logger.info(f\"Calling DefaultAzureCredential.\")\n", "credential = DefaultAzureCredential()\n", "# credential = InteractiveBrowserCredential()\n", "ml_client = MLClient(\n", " credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE\n", ")" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### 2.2. Create AzureML environment and data\n", "\n", "Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda).\n", "This hands-on uses conda yaml.\n", "\n", "Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "env = get_or_create_docker_environment_asset(\n", " ml_client, azure_env_name, docker_dir=CLOUD_DIR, update=False\n", ")\n", "data = get_or_create_data_asset(\n", " ml_client, AZURE_DPO_DATA_NAME, data_local_dir=DPO_DATA_DIR, update=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.3. Training script\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# !pygmentize scripts/launcher_distributed.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<br>\n", "\n", "## 3. Training\n", "\n", "---\n", "\n", "### 3.1. Create the compute cluster\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from azure.ai.ml.entities import AmlCompute\n", "\n", "logger.info(f\"===== 3. Training =====\")\n", "### Create the compute cluster\n", "try:\n", " compute = ml_client.compute.get(azure_compute_cluster_name)\n", " logger.info(\"The compute cluster already exists! Reusing it for the current run\")\n", "except Exception as ex:\n", " logger.info(\n", " f\"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!\"\n", " )\n", " try:\n", " logger.info(\"Attempt #1 - Trying to create a dedicated compute\")\n", " tier = \"LowPriority\" if USE_LOWPRIORITY_VM else \"Dedicated\"\n", " compute = AmlCompute(\n", " name=azure_compute_cluster_name,\n", " size=azure_compute_cluster_size,\n", " tier=tier,\n", " max_instances=1, # For multi node training set this to an integer value more than 1\n", " )\n", " ml_client.compute.begin_create_or_update(compute).wait()\n", " except Exception as e:\n", " logger.info(\"Error\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2. Start training job\n", "\n", "The `command` allows user to configure the following key aspects.\n", "\n", "- `inputs` - This is the dictionary of inputs using name value pairs to the command.\n", " - `type` - The type of input. This can be a `uri_file` or `uri_folder`. The default is `uri_folder`.\n", " - `path` - The path to the file or folder. These can be local or remote files or folders. For remote files - http/https, wasb are supported.\n", " - Azure ML `data`/`dataset` or `datastore` are of type `uri_folder`. To use `data`/`dataset` as input, you can use registered dataset in the workspace using the format '<data_name>:<version>'. For e.g Input(type='uri_folder', path='my_dataset:1')\n", " - `mode` - Mode of how the data should be delivered to the compute target. Allowed values are `ro_mount`, `rw_mount` and `download`. Default is `ro_mount`\n", "- `code` - This is the path where the code to run the command is located\n", "- `compute` - The compute on which the command will run. You can run it on the local machine by using `local` for the compute.\n", "- `command` - This is the command that needs to be run\n", " in the `command` using the `${{inputs.<input_name>}}` expression. To use files or folders as inputs, we can use the `Input` class. The `Input` class supports three parameters:\n", "- `environment` - This is the environment needed for the command to run. Curated (built-in) or custom environments from the workspace can be used.\n", "- `instance_count` - Number of nodes. Default is 1.\n", "- `distribution` - Distribution configuration for distributed training scenarios. Azure Machine Learning supports PyTorch, TensorFlow, and MPI-based distributed.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from azure.ai.ml import command\n", "from azure.ai.ml import Input, Output\n", "from azure.ai.ml.entities import ResourceConfiguration\n", "from utils.aml_common import get_num_gpus\n", "from azure.ai.ml.entities import Model\n", "\n", "num_gpu = get_num_gpus(azure_compute_cluster_size)\n", "logger.info(f\"Number of GPUs={num_gpu}\")\n", "\n", "str_command = \"\"\n", "if USE_BUILTIN_ENV:\n", " str_env = \"azureml://registries/azureml/environments/acpt-pytorch-2.2-cuda12.1/versions/19\" # Use built-in Environment asset\n", " str_command += \"pip install -r requirements.txt && \"\n", "else:\n", " str_env = f\"{azure_env_name}@latest\" # Use Curated (built-in) Environment asset\n", "\n", "if num_gpu > 1:\n", " tune_recipe = \"lora_dpo_distributed\"\n", " str_command += \"python launcher_distributed.py \"\n", "else:\n", " tune_recipe = \"lora_dpo_single_device\"\n", " str_command += \"python launcher_single.py \"\n", "\n", "inputs_dict = dict(\n", " # train_dir=Input(type=\"uri_folder\", path=DPO_DATA_DIR), # Get data from local path\n", " train_dir=Input(path=f\"{AZURE_DPO_DATA_NAME}@latest\"), # Get data from Data asset\n", " hf_token=HF_TOKEN,\n", " tune_recipe=tune_recipe,\n", " tune_action=\"fine-tune\",\n", " model_id=HF_MODEL_NAME_OR_PATH,\n", " model_dir=\"./model\",\n", " log_dir=\"./outputs/log\",\n", " model_output_dir=\"./outputs\",\n", " tune_finetune_yaml=\"lora_finetune_dpo_phi4.yaml\", # YOU CAN CHANGE THIS TO YOUR OWN CONFIG FILE\n", " tune_eval_yaml=\"evaluation_phi4.yaml\", # YOU CAN CHANGE THIS TO YOUR OWN CONFIG FILE\n", " tune_quant_yaml=\"quant_phi4.yaml\", # YOU CAN CHANGE THIS TO YOUR OWN CONFIG FILE\n", ")\n", "\n", "if len(wandb_api_key) > 0:\n", " str_command += \"--wandb_api_key ${{inputs.wandb_api_key}} \\\n", " --wandb_project ${{inputs.wandb_project}} \\\n", " --wandb_watch ${{inputs.wandb_watch}} \"\n", " inputs_dict[\"wandb_api_key\"] = wandb_api_key\n", " inputs_dict[\"wandb_project\"] = wandb_project\n", " inputs_dict[\"wandb_watch\"] = wandb_watch\n", "\n", "\n", "str_command += \"--train_dir ${{inputs.train_dir}} \\\n", " --hf_token ${{inputs.hf_token}} \\\n", " --tune_recipe ${{inputs.tune_recipe}} \\\n", " --tune_action ${{inputs.tune_action}} \\\n", " --model_id ${{inputs.model_id}} \\\n", " --model_dir ${{inputs.model_dir}} \\\n", " --log_dir ${{inputs.log_dir}} \\\n", " --model_output_dir ${{inputs.model_output_dir}} \\\n", " --tune_finetune_yaml ${{inputs.tune_finetune_yaml}} \\\n", " --tune_eval_yaml ${{inputs.tune_eval_yaml}} \\\n", " --tune_quant_yaml ${{inputs.tune_quant_yaml}}\"\n", "\n", "logger.info(f\"Tune recipe: {tune_recipe}\")\n", "\n", "job = command(\n", " inputs=inputs_dict,\n", " code=\"./scripts\", # local path where the code is stored\n", " compute=azure_compute_cluster_name,\n", " command=str_command,\n", " environment=str_env,\n", " instance_count=1,\n", " distribution={\n", " \"type\": \"PyTorch\",\n", " \"process_count_per_instance\": num_gpu, # For multi-gpu training set this to an integer value more than 1\n", " },\n", ")\n", "\n", "returned_job = ml_client.jobs.create_or_update(job)\n", "logger.info(\n", " \"\"\"Started training job. Now a dedicated Compute Cluster for training is provisioned and the environment\n", "required for training is automatically set up from Environment.\n", "\n", "If you have set up a new custom Environment, it will take approximately 20 minutes or more to set up the Environment before provisioning the training cluster.\n", "\"\"\"\n", ")\n", "ml_client.jobs.stream(returned_job.name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "display(returned_job)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check if the `trained_model` output is available\n", "job_name = returned_job.name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store job_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<br>\n", "\n", "## 4. (Optional) Create model asset and get fine-tuned LLM to local folder\n", "\n", "---\n", "\n", "### 3.1. Create model asset\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utils.aml_common import get_or_create_model_asset\n", "\n", "azure_model_name = d[\"serve\"][\"azure_model_name\"]\n", "model_dir = d[\"train\"][\"model_dir\"]\n", "model = get_or_create_model_asset(\n", " ml_client,\n", " azure_model_name,\n", " job_name,\n", " model_dir,\n", " model_type=\"custom_model\",\n", " download_quantized_model_only=True,\n", " update=False,\n", ")\n", "\n", "logger.info(\n", " \"===== 4. (Optional) Create model asset and get fine-tuned LLM to local folder =====\"\n", ")\n", "logger.info(f\"azure_model_name={azure_model_name}\")\n", "logger.info(f\"model_dir={model_dir}\")\n", "logger.info(f\"model={model}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2. Get fine-tuned LLM to local folder\n", "\n", "You can copy it to your local directory to perform inference or serve the model in Azure environment. (e.g., real-time endpoint)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Download the model (this is optional)\n", "DOWNLOAD_TO_LOCAL = False\n", "local_model_dir = \"./artifact_downloads_dpo\"\n", "\n", "if DOWNLOAD_TO_LOCAL:\n", " os.makedirs(local_model_dir, exist_ok=True)\n", " ml_client.models.download(\n", " name=azure_model_name, download_path=local_model_dir, version=model.version\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!rm -rf $SFT_DATA_DIR $DPO_DATA_DIR {local_model_dir}" ] } ], "metadata": { "kernelspec": { "display_name": "py312-dev", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.2" }, "microsoft": { "ms_spell_check": { "ms_spell_check_language": "en" } }, "nteract": { "version": "nteract-front-end@1.0.0" } }, "nbformat": 4, "nbformat_minor": 4 }