florence2-VQA/1_training_mlflow_florence2.ipynb (640 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fine-tuning Florence2 for VQA (Visual Question Answering) using the Azure ML Python SDK and MLflow\n",
"\n",
"### Overview\n",
"\n",
"Azure ML Workspace is compatible with MLflow and can be used as an MLflow Tracking Server, as described in the following official guide from Microsoft. MLflow provides features such as experiment tracking, model management, and model deployment, allowing you to manage data science and machine learning workflows more efficiently and systematically. Below are the main advantages of using Azure ML and MLflow together.\n",
"\n",
"#### 1. Experiment tracking and management\n",
"\n",
"You can systematically manage the parameters, metrics, and artifacts of all your experiments. Integrating with Azur eML allows you to easily track and manage this information within your Azure ML workspace.\n",
"\n",
"#### 2. Model management\n",
"\n",
"MLflow provides a model registry for model versioning. Integrate with AzureML to systematically manage and deploy all versions of your models. When combined with AzureML's deployment capabilities, models can be easily deployed to a variety of environments (e.g. Azure Kubernetes Service, Azure Container Instances).\n",
"\n",
"#### 3. Reproducibility and collaboration\n",
"\n",
"MLflow records the parameters and environment of every experiment, so you can accurately reproduce the experiment. This is very useful when you need to redo the same experiment across collaborating team members, or when you need to rerun an experiment at a later date.\n",
"\n",
"#### 4. CI/CD integration\n",
"\n",
"MLflow makes it easy to implement continuous integration (CI) and continuous deployment (CD) of machine learning models. Integrate with Azure DevOps or GitHub Actions to automatically run training, validation, and deployment processes as model changes occur.\n",
"\n",
"When training a model with Hugging Face's Trainer API, if you specify `report_to=\"azure_ml\"`, basic indicators will be automatically logged without any additional code. Of course, you can freely log custom indicators using Bring Your Own Script like the conventional method, but Azure ML's basic logging function is also excellent, so try using it as a baseline.\n",
"\n",
"[Note] Please use `Python 3.10 - SDK v2 (azureml_py310_sdkv2)` conda environment.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load config file\n",
"\n",
"---\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"\n",
"import os, sys\n",
"lab_prep_dir = os.getcwd().split(\"slm-innovator-lab\")[0] + \"slm-innovator-lab/0_lab_preparation\"\n",
"sys.path.append(os.path.abspath(lab_prep_dir))\n",
"\n",
"from common import check_kernel\n",
"check_kernel()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import os\n",
"import yaml\n",
"from logger import logger\n",
"from datetime import datetime\n",
"\n",
"snapshot_date = datetime.now().strftime(\"%Y-%m-%d\")\n",
"\n",
"with open(\"config.yml\") as f:\n",
" d = yaml.load(f, Loader=yaml.FullLoader)\n",
"\n",
"AZURE_SUBSCRIPTION_ID = d[\"config\"][\"AZURE_SUBSCRIPTION_ID\"]\n",
"AZURE_RESOURCE_GROUP = d[\"config\"][\"AZURE_RESOURCE_GROUP\"]\n",
"AZURE_WORKSPACE = d[\"config\"][\"AZURE_WORKSPACE\"]\n",
"AZURE_DATA_NAME = d[\"config\"][\"AZURE_DATA_NAME\"]\n",
"DATA_DIR = d[\"config\"][\"DATA_DIR\"]\n",
"CLOUD_DIR = d[\"config\"][\"CLOUD_DIR\"]\n",
"HF_MODEL_NAME_OR_PATH = d[\"config\"][\"HF_MODEL_NAME_OR_PATH\"]\n",
"IS_DEBUG = d[\"config\"][\"IS_DEBUG\"]\n",
"USE_LOWPRIORITY_VM = d[\"config\"][\"USE_LOWPRIORITY_VM\"]\n",
"\n",
"azure_env_name = d[\"train\"][\"azure_env_name\"]\n",
"azure_compute_cluster_name = d[\"train\"][\"azure_compute_cluster_name\"]\n",
"azure_compute_cluster_size = d[\"train\"][\"azure_compute_cluster_size\"]\n",
"\n",
"os.makedirs(DATA_DIR, exist_ok=True)\n",
"os.makedirs(CLOUD_DIR, exist_ok=True)\n",
"\n",
"logger.info(\"===== 0. Azure ML Training Info =====\")\n",
"logger.info(f\"AZURE_SUBSCRIPTION_ID={AZURE_SUBSCRIPTION_ID}\")\n",
"logger.info(f\"AZURE_RESOURCE_GROUP={AZURE_RESOURCE_GROUP}\")\n",
"logger.info(f\"AZURE_WORKSPACE={AZURE_WORKSPACE}\")\n",
"logger.info(f\"AZURE_DATA_NAME={AZURE_DATA_NAME}\")\n",
"logger.info(f\"DATA_DIR={DATA_DIR}\")\n",
"logger.info(f\"CLOUD_DIR={CLOUD_DIR}\")\n",
"logger.info(f\"HF_MODEL_NAME_OR_PATH={HF_MODEL_NAME_OR_PATH}\")\n",
"logger.info(f\"IS_DEBUG={IS_DEBUG}\")\n",
"logger.info(f\"USE_LOWPRIORITY_VM={USE_LOWPRIORITY_VM}\")\n",
"\n",
"logger.info(f\"azure_env_name={azure_env_name}\")\n",
"logger.info(f\"azure_compute_cluster_name={azure_compute_cluster_name}\")\n",
"logger.info(f\"azure_compute_cluster_size={azure_compute_cluster_size}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 1. Dataset preparation\n",
"\n",
"---\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We store datasets from the HuggingFace hub on shared storage because storing them in the root can run out of space.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import torch\n",
"from pathlib import Path\n",
"from datasets import load_dataset\n",
"\n",
"curr_dir = Path.cwd()\n",
"model_cache_dir = os.path.join(curr_dir.parent.parent, \"model\")\n",
"dataset_cache_dir = os.path.join(curr_dir.parent.parent, \"dataset\")\n",
"\n",
"dataset = load_dataset(\"HuggingFaceM4/DocumentVQA\", cache_dir=dataset_cache_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if IS_DEBUG:\n",
" dataset[\"train\"] = dataset[\"train\"].select(range(1000))\n",
" dataset[\"validation\"] = dataset[\"validation\"].select(range(200))\n",
" dataset[\"test\"] = dataset[\"test\"].select(range(200))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"dataset.save_to_disk(DATA_DIR)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 2. Training preparation\n",
"\n",
"---\n",
"\n",
"### 2.1. Configure workspace details\n",
"\n",
"To connect to a workspace, we need identifying parameters - a subscription, a resource group, and a workspace name. We will use these details in the MLClient from azure.ai.ml to get a handle on the Azure Machine Learning workspace we need. We will use the default Azure authentication for this hands-on.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# import required libraries\n",
"import time\n",
"from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n",
"from azure.ai.ml import MLClient, Input\n",
"from azure.ai.ml.dsl import pipeline\n",
"from azure.ai.ml import load_component\n",
"from azure.ai.ml import command\n",
"from azure.ai.ml.entities import Data, Environment, BuildContext\n",
"from azure.ai.ml.entities import Model\n",
"from azure.ai.ml import Input\n",
"from azure.ai.ml import Output\n",
"from azure.ai.ml.constants import AssetTypes\n",
"from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError\n",
"\n",
"credential = DefaultAzureCredential()\n",
"ml_client = MLClient(\n",
" credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE\n",
")\n",
"\n",
"# The code below may conflict with AI Foundry as of February 2025.\n",
"# ml_client = None\n",
"# try:\n",
"# ml_client = MLClient.from_config(credential)\n",
"# except Exception as ex:\n",
"# print(ex)\n",
"# ml_client = MLClient(credential, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### 2.2. Create AzureML environment and data\n",
"\n",
"Azure ML defines containers (called environment asset) in which your code will run. We can use the built-in environment or build a custom environment (Docker container, conda).\n",
"This hands-on uses conda yaml.\n",
"\n",
"Training data can be used as a dataset stored in the local development environment, but can also be registered as AzureML data.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import Environment, BuildContext\n",
"\n",
"\n",
"def get_or_create_environment_asset(\n",
" ml_client, env_name, conda_yml=\"cloud/conda.yml\", update=False\n",
"):\n",
"\n",
" try:\n",
" latest_env_version = max(\n",
" [int(e.version) for e in ml_client.environments.list(name=env_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\n",
" \"Found Environment asset, but will update the Environment.\"\n",
" )\n",
" else:\n",
" env_asset = ml_client.environments.get(\n",
" name=env_name, version=latest_env_version\n",
" )\n",
" print(f\"Found Environment asset: {env_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" print(f\"Exception: {e}\")\n",
" env_docker_image = Environment(\n",
" image=\"mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest\",\n",
" conda_file=conda_yml,\n",
" name=env_name,\n",
" description=\"Environment created for llm fine-tuning.\",\n",
" )\n",
" env_asset = ml_client.environments.create_or_update(env_docker_image)\n",
" print(f\"Created Environment asset: {env_name}\")\n",
"\n",
" return env_asset\n",
"\n",
"\n",
"def get_or_create_docker_environment_asset(\n",
" ml_client, env_name, docker_dir, update=False\n",
"):\n",
"\n",
" try:\n",
" latest_env_version = max(\n",
" [int(e.version) for e in ml_client.environments.list(name=env_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\n",
" \"Found Environment asset, but will update the Environment.\"\n",
" )\n",
" else:\n",
" env_asset = ml_client.environments.get(\n",
" name=env_name, version=latest_env_version\n",
" )\n",
" print(f\"Found Environment asset: {env_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" print(f\"Exception: {e}\")\n",
" env_docker_image = Environment(\n",
" build=BuildContext(path=docker_dir),\n",
" name=env_name,\n",
" description=\"Environment created from a Docker context.\",\n",
" )\n",
" env_asset = ml_client.environments.create_or_update(env_docker_image)\n",
" print(f\"Created Environment asset: {env_name}\")\n",
"\n",
" return env_asset\n",
"\n",
"\n",
"def get_or_create_data_asset(ml_client, data_name, data_local_dir, update=False):\n",
"\n",
" try:\n",
" latest_data_version = max(\n",
" [int(d.version) for d in ml_client.data.list(name=data_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\"Found Data asset, but will update the Data.\")\n",
" else:\n",
" data_asset = ml_client.data.get(name=data_name, version=latest_data_version)\n",
" print(f\"Found Data asset: {data_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" data = Data(\n",
" path=data_local_dir,\n",
" type=AssetTypes.URI_FOLDER,\n",
" description=f\"{data_name} for fine tuning\",\n",
" tags={\"FineTuningType\": \"Instruction\", \"Language\": \"En\"},\n",
" name=data_name,\n",
" )\n",
" data_asset = ml_client.data.create_or_update(data)\n",
" print(f\"Created Data asset: {data_name}\")\n",
"\n",
" return data_asset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"env = get_or_create_docker_environment_asset(\n",
" ml_client, azure_env_name, docker_dir=f\"{CLOUD_DIR}/train\", update=False\n",
")\n",
"data = get_or_create_data_asset(\n",
" ml_client, AZURE_DATA_NAME, data_local_dir=DATA_DIR, update=False\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.3. Training script\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!pygmentize src_train/train_mlflow.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 3. Training\n",
"\n",
"---\n",
"\n",
"### 3.1. Create the compute cluster\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import AmlCompute\n",
"\n",
"### Create the compute cluster\n",
"try:\n",
" compute = ml_client.compute.get(azure_compute_cluster_name)\n",
" print(\"The compute cluster already exists! Reusing it for the current run\")\n",
"except Exception as ex:\n",
" print(\n",
" f\"Looks like the compute cluster doesn't exist. Creating a new one with compute size {azure_compute_cluster_size}!\"\n",
" )\n",
" try:\n",
" print(\"Attempt #1 - Trying to create a dedicated compute\")\n",
" tier = \"LowPriority\" if USE_LOWPRIORITY_VM else \"Dedicated\"\n",
" compute = AmlCompute(\n",
" name=azure_compute_cluster_name,\n",
" size=azure_compute_cluster_size,\n",
" tier=tier,\n",
" max_instances=1, # For multi node training set this to an integer value more than 1\n",
" )\n",
" ml_client.compute.begin_create_or_update(compute).wait()\n",
" except Exception as e:\n",
" print(\"Error\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3.2. Start training job\n",
"\n",
"The `command` allows user to configure the following key aspects.\n",
"\n",
"- `inputs` - This is the dictionary of inputs using name value pairs to the command.\n",
" - `type` - The type of input. This can be a `uri_file` or `uri_folder`. The default is `uri_folder`.\n",
" - `path` - The path to the file or folder. These can be local or remote files or folders. For remote files - http/https, wasb are supported.\n",
" - Azure ML `data`/`dataset` or `datastore` are of type `uri_folder`. To use `data`/`dataset` as input, you can use registered dataset in the workspace using the format '<data_name>:<version>'. For e.g Input(type='uri_folder', path='my_dataset:1')\n",
" - `mode` - Mode of how the data should be delivered to the compute target. Allowed values are `ro_mount`, `rw_mount` and `download`. Default is `ro_mount`\n",
"- `code` - This is the path where the code to run the command is located\n",
"- `compute` - The compute on which the command will run. You can run it on the local machine by using `local` for the compute.\n",
"- `command` - This is the command that needs to be run\n",
" in the `command` using the `${{inputs.<input_name>}}` expression. To use files or folders as inputs, we can use the `Input` class. The `Input` class supports three parameters:\n",
"- `environment` - This is the environment needed for the command to run. Curated (built-in) or custom environments from the workspace can be used.\n",
"- `instance_count` - Number of nodes. Default is 1.\n",
"- `distribution` - Distribution configuration for distributed training scenarios. Azure Machine Learning supports PyTorch, TensorFlow, and MPI-based distributed.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from azure.ai.ml import command\n",
"from azure.ai.ml import Input\n",
"from azure.ai.ml.entities import ResourceConfiguration\n",
"\n",
"job = command(\n",
" inputs=dict(\n",
" # train_dir=Input(type=\"uri_folder\", path=DATA_DIR), # Get data from local path\n",
" train_dir=Input(path=f\"{AZURE_DATA_NAME}@latest\"), # Get data from Data asset\n",
" epoch=d[\"train\"][\"epoch\"],\n",
" train_batch_size=d[\"train\"][\"train_batch_size\"],\n",
" eval_batch_size=d[\"train\"][\"eval_batch_size\"],\n",
" model_dir=d[\"train\"][\"model_dir\"],\n",
" ),\n",
" code=\"./src_train\", # local path where the code is stored\n",
" compute=azure_compute_cluster_name,\n",
" command=\"python train_mlflow.py --train_dir ${{inputs.train_dir}} --epochs ${{inputs.epoch}} --train_batch_size ${{inputs.train_batch_size}} --eval_batch_size ${{inputs.eval_batch_size}} --model_dir ${{inputs.model_dir}}\",\n",
" # environment=\"azureml://registries/azureml/environments/acft-hf-nlp-gpu/versions/77\", # Use built-in Environment asset\n",
" environment=f\"{azure_env_name}@latest\",\n",
" distribution={\n",
" \"type\": \"PyTorch\",\n",
" \"process_count_per_instance\": 1, # For multi-gpu training set this to an integer value more than 1\n",
" },\n",
")\n",
"returned_job = ml_client.jobs.create_or_update(job)\n",
"ml_client.jobs.stream(returned_job.name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"display(returned_job)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# check if the `trained_model` output is available\n",
"job_name = returned_job.name"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%store job_name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br>\n",
"\n",
"## 4. (Optional) Create model asset and get fine-tuned LLM to local folder\n",
"\n",
"---\n",
"\n",
"### 4.1. Create model asset\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_or_create_model_asset(\n",
" ml_client,\n",
" model_name,\n",
" job_name,\n",
" model_dir=\"outputs\",\n",
" model_type=\"custom_model\",\n",
" update=False,\n",
"):\n",
"\n",
" try:\n",
" latest_model_version = max(\n",
" [int(m.version) for m in ml_client.models.list(name=model_name)]\n",
" )\n",
" if update:\n",
" raise ResourceExistsError(\"Found Model asset, but will update the Model.\")\n",
" else:\n",
" model_asset = ml_client.models.get(\n",
" name=model_name, version=latest_model_version\n",
" )\n",
" print(f\"Found Model asset: {model_name}. Will not create again\")\n",
" except (ResourceNotFoundError, ResourceExistsError) as e:\n",
" print(f\"Exception: {e}\")\n",
" model_path = f\"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_dir}/\"\n",
" run_model = Model(\n",
" name=model_name,\n",
" path=model_path,\n",
" description=\"Model created from run.\",\n",
" type=model_type, # mlflow_model, custom_model, triton_model\n",
" )\n",
" model_asset = ml_client.models.create_or_update(run_model)\n",
" print(f\"Created Model asset: {model_name}\")\n",
"\n",
" return model_asset"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"azure_model_name = d[\"serve\"][\"azure_model_name\"]\n",
"model_dir = d[\"train\"][\"model_dir\"]\n",
"model = get_or_create_model_asset(\n",
" ml_client,\n",
" azure_model_name,\n",
" job_name,\n",
" model_dir,\n",
" model_type=\"custom_model\",\n",
" update=False,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4.2. Get fine-tuned LLM to local folder\n",
"\n",
"You can copy it to your local directory to perform inference or serve the model in Azure environment. (e.g., real-time endpoint)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!rm -rf {local_model_dir}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Download the model (this is optional)\n",
"local_model_dir = \"./artifact_downloads\"\n",
"os.makedirs(local_model_dir, exist_ok=True)\n",
"\n",
"ml_client.models.download(\n",
" name=azure_model_name, download_path=local_model_dir, version=model.version\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
},
"microsoft": {
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}