launcher/nemo/stages.py (618 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Portions taken from https://github.com/NVIDIA/NeMo-Framework-Launcher, Copyright Nvidia Corporation
import logging
import shutil
from pathlib import Path
from typing import Dict, List
import omegaconf
from nemo_launcher.core.stages import Training, _hydra_interpolation
from nemo_launcher.utils.job_utils import JobPaths
from omegaconf import OmegaConf
from ..accelerator_devices import get_num_accelerator_devices
from ..efa import (
efa_supported_instance,
instanceWithMultipleEFAs,
instanceWithRDMASupport,
)
from ..telemetry import Telemetry
from .constants import ROOT_DIR
from .launchers import SMAutoLauncher
logger = logging.getLogger(__name__)
# Predefined distributed args for torchrun
PROCESSES_PER_NODE = "PROCESSES_PER_NODE"
NNODES = "NNODES"
NODEID = "NODEID"
MASTER_ADDR = "MASTER_ADDR"
MASTER_PORT = "MASTER_PORT"
DISTRIBUTED_ARGS = "DISTRIBUTED_ARGS"
CONTAINER_NAME = "sm_training_launcher"
TRANSFORMERS_VERSION_FOR_MULTIMODAL = "4.45.2"
def set_multinode_envs(env_vars, instance_type):
# https://github.com/aws/aws-ofi-nccl/blob/master/doc/efa-env-var.md
if get_num_efa_devices(instance_type) > 0:
env_vars["FI_PROVIDER"] = "efa"
env_vars["NCCL_SOCKET_IFNAME"] = "^lo,docker0,veth_def_agent"
env_vars["NCCL_IGNORE_DISABLED_P2P"] = "1"
env_vars["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
env_vars["TORCH_DIST_INIT_BARRIER"] = "1"
env_vars["CUDA_DEVICE_MAX_CONNECTIONS"] = "1"
return env_vars
def allow_rdma(instance_type):
return instance_type in instanceWithRDMASupport
def get_instance_type(cfg):
instance_type = None
if cfg.get("instance_type"):
instance_type = cfg.instance_type
else:
# custom path
instance_type = cfg.cluster.instance_type
assert instance_type is not None, "instance type is required from config"
if instance_type.startswith("ml."):
instance_type = instance_type[3:]
return instance_type.lower()
def get_num_efa_devices(instance_type):
# If not a EFA instance, return 0
if instance_type not in efa_supported_instance:
return 0
# If multi-EFA, return from mapping
if instance_type in instanceWithMultipleEFAs:
return instanceWithMultipleEFAs[instance_type]
# Only a single EFA device
return 1
def get_ntasks_per_node(stage_cfg):
"""
Get the number of processes per node used for training
When running with custom script it will be stage_cfg.run.ntasks_per_node
"""
ntasks = OmegaConf.select(stage_cfg, "run.ntasks_per_node")
if ntasks is None:
ntasks = stage_cfg.get("trainer").get("devices")
return ntasks
def get_num_nodes(stage_cfg):
"""
Get the number of nodes used for training
When running with custom script it will be stage_cfg.run.nodes
"""
run_cfg = stage_cfg.get("run")
nodes = run_cfg.get("nodes")
if nodes is None:
nodes = stage_cfg.get("trainer").get("num_nodes")
return nodes
def get_container_type(container):
if container is None:
return None
if container.endswith(".sqsh"):
return "enroot"
return "docker"
def convert_dict_to_command_line_args(key_values):
command = " ".joi
for key, value in key_values:
command += f"{key}=value "
class SMTraining(Training):
"""
Base stage class for doing training on Sagemaker
"""
def __init__(self, cfg):
super().__init__(cfg)
# Use GPU device for default flow for NeMo runs
self.device = "gpu"
self.instance_type = get_instance_type(cfg)
self.num_efa_devices = get_num_efa_devices(self.instance_type)
self.telemetry = Telemetry()
@property
def _default_repo(self):
# Default repo to mount script from
return None
@property
def _default_branch(self):
# Default repo branch to mount script from
return None
def _make_torchrun_string(self):
"""
Create torchrun string based on single/multi-node job
"""
ntasks_per_node = get_ntasks_per_node(self.stage_cfg)
if int(get_num_nodes(self.stage_cfg)) > 1:
return f"torchrun ${DISTRIBUTED_ARGS} "
else:
return f"torchrun --nproc_per_node {ntasks_per_node} "
def _make_custom_call_string(self, stage_cfg_path=None) -> str:
"""
Create the training command with torchrun, script and args
"""
script_path = str(self._entry_script_path)
torchrun_cmd = self._make_torchrun_string()
script_args_str = self.get_script_args_str(stage_cfg_path)
command = [torchrun_cmd, script_path, script_args_str]
command_string = " \\\n ".join(command)
return command_string
def _get_hostfile_location(self):
"""
Get the file location to store the hostnames
"""
job_path = self.get_job_path()
hostfile_location = Path(job_path.folder / "hostname")
return hostfile_location
def _use_local_repo(self) -> bool:
repo_url_or_path = None
if OmegaConf.select(self.cfg, "git.repo_url_or_path"):
repo_url_or_path = self.cfg.git.repo_url_or_path
return repo_url_or_path is not None and not (
repo_url_or_path.startswith("http") or repo_url_or_path.startswith("codecommit::")
)
def _make_docker_exec_script_text(self, stage_cfg_path):
docker_exec_script_text = ["#!/bin/bash", "set -ex"]
docker_exec_script_text.append("")
docker_exec_script_text.append("function job_epilogue {")
docker_exec_script_text.append(
" docker ps -a --filter 'name="
+ CONTAINER_NAME
+ "' --format '{{.ID}}' | xargs -I{} docker rm -f {} > /dev/null 2>&1 || true"
)
docker_exec_script_text.append("}")
docker_exec_script_text.append("trap job_epilogue EXIT SIGTERM SIGINT")
docker_exec_script_text.append("")
docker_exec_script_text.append(f"docker exec {CONTAINER_NAME} bash {stage_cfg_path.parents[0]}/train_script.sh")
docker_exec_script_text.append("")
docker_exec_script_text.append("exit 0")
return "\n".join(docker_exec_script_text)
def _make_launch_docker_container_text(self):
"""
Creating a script to launch container on all nodes
This will be called only when running docker container on Slurm cluster
"""
launch_docker_container_text = ["#!/bin/bash", "set -ex"]
image = self.cfg.container
# Login ECR
launch_docker_container_text.append(f'echo "image is {image}"')
is_ecr_image = "amazonaws.com" in image
if not is_ecr_image:
launch_docker_container_text.append(f'echo "Not an ECR image, skipping ECR login"')
else:
# format will be account.dkr.ecr.region.amazonaws.com/repo:tag
link = image.split("/")[0]
region = link.split(".")[3]
launch_docker_container_text.append(f"# Login ECR")
launch_docker_container_text.append(
f"aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {link}"
)
launch_docker_container_text.append("")
# Handle EFA devices
if get_num_efa_devices(self.instance_type) > 0:
launch_docker_container_text.append(f"# Getting EFA devices")
if allow_rdma(self.instance_type):
launch_docker_container_text.append('device=("--device=/dev/gdrdrv")')
else:
launch_docker_container_text.append("device=()")
launch_docker_container_text.extend(
[
"while IFS= read -r -d '' d; do",
' device+=("--device=${d}")',
'done < <(find "/dev/infiniband" -name "uverbs*" -print0)',
]
)
launch_docker_container_text.append("")
# Clean old containers
launch_docker_container_text.append(f"# Clean old containers")
launch_docker_container_text.append(
"docker ps -a --filter 'name="
+ CONTAINER_NAME
+ "' --format '{{.ID}}' | xargs -I{} docker rm -f {} > /dev/null 2>&1 || true"
)
launch_docker_container_text.append(
"docker ps -a --filter 'name=" + CONTAINER_NAME + "' --format '{{.ID}}' | xargs -I{} docker wait {} || true"
)
launch_docker_container_text.append("")
# Pull new container
launch_docker_container_text.append(f'docker pull "{image}"')
# Docker run command
launch_docker_container_text.extend(
[
f"docker run --gpus {get_ntasks_per_node(self.stage_cfg)} \\",
f' --privileged --rm -d --name "{CONTAINER_NAME}" \\',
" --uts=host --ulimit stack=67108864 --ulimit memlock=-1 --ipc=host --net=host \\",
" --security-opt seccomp=unconfined \\",
]
)
if get_num_efa_devices(self.instance_type) > 0:
launch_docker_container_text.append(' "${device[@]}" \\')
# Handle volume mounting
mount_str = self._make_container_mounts_string()
for mount in mount_str.split(","):
launch_docker_container_text.append(f" -v {mount} \\")
# Handle user run args and post run commands
post_launch_commands = []
if OmegaConf.select(self.cfg, "cluster.slurm_docker_cfg", default=None) is not None:
if self.cfg.cluster.slurm_docker_cfg.get("docker_args", None) is not None:
user_arg = []
for arg in self.cfg.cluster.slurm_docker_cfg.docker_args:
user_arg.append(arg)
if len(user_arg) > 0:
user_arg = " ".join(user_arg)
launch_docker_container_text.append(f" {user_arg} \\")
if self.cfg.cluster.slurm_docker_cfg.get("post_launch_commands", None) is not None:
for cmd in self.cfg.cluster.slurm_docker_cfg.post_launch_commands:
post_launch_commands.append(cmd)
if OmegaConf.select(self.cfg, "recipes.model.multi_modal", default=False):
transformers_upgrade_cmd = "pip install transformers==4.45.2"
post_launch_commands.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=None) == "deepseek_r1":
transformers_upgrade_cmd = "pip install transformers==4.48.2"
post_launch_commands.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=None) == "llama_v4":
transformers_upgrade_cmd = "pip install transformers==4.51.3"
post_launch_commands.append(transformers_upgrade_cmd)
launch_docker_container_text.append(f' "{image}" sleep infinity')
launch_docker_container_text.append("")
# Allow containers to talk to each other
launch_docker_container_text.append(f"# Running post launching commands")
launch_docker_container_text.extend(
[
f'docker exec -itd "{CONTAINER_NAME}" bash -c "printf \\"Port 2022\\n\\" >> /etc/ssh/sshd_config"',
f'docker exec -itd "{CONTAINER_NAME}" bash -c "printf \\" Port 2022\\n\\" >> /root/.ssh/config"',
f'docker exec -itd "{CONTAINER_NAME}" bash -c "service ssh start"',
]
)
for cmd in post_launch_commands:
launch_docker_container_text.append(f'docker exec "{CONTAINER_NAME}" bash -c "{cmd}"')
launch_docker_container_text.append("")
# Exit
launch_docker_container_text.append("exit 0")
return "\n".join(launch_docker_container_text)
def _make_train_script_text(self, stage_cfg_path=None, port=41000) -> str:
"""
The custom train entry script, it will be responsible for following
- Handle resolving hostname and create torch distribtued args
- Pull from github if required
- Launch torchrun command
"""
nodes = get_num_nodes(self.stage_cfg)
ntasks_per_node = get_ntasks_per_node(self.stage_cfg)
script_text = ["#!/bin/bash", "set -ex"]
# Also export env vars here so that they can be consumed by docker container
env_vars = self.get_env_vars()
if env_vars:
script_text.extend([f"export {k}={v}" for k, v in env_vars.items()])
# Prepare for the host information to create the torchrun command
if nodes > 1:
script_text.extend(
[
f"{MASTER_ADDR}=$(head -n 1 {str(self._get_hostfile_location())})",
f'{NODEID}=$(($(grep -nx -o "\\b$(hostname)\\b" {str(self._get_hostfile_location())} | cut -d ":" -f 1) - 1))',
f"{NNODES}={nodes}",
f"{PROCESSES_PER_NODE}={ntasks_per_node}",
f"{MASTER_PORT}={port}",
"",
]
)
if self.device == "trainium":
script_text.append(
f'{DISTRIBUTED_ARGS}="--nproc_per_node ${PROCESSES_PER_NODE} --nnodes ${NNODES} --node_rank ${NODEID} --master_addr ${MASTER_ADDR} --master_port ${MASTER_PORT}"'
)
else:
script_text.append(
f'{DISTRIBUTED_ARGS}="--nproc_per_node ${PROCESSES_PER_NODE} --nnodes ${NNODES} --rdzv_endpoint=${MASTER_ADDR} --rdzv_id=100 --rdzv_backend=c10d"'
)
else:
script_text.append(f'{DISTRIBUTED_ARGS}="--nproc_per_node {ntasks_per_node}"')
# Prepare github pull
# Aligns with the train-script preparation in launcher/nemo/k8s_templates/training.yaml
script_text.append("")
if self.cfg.get("git", None) is not None or self._default_repo is not None:
repo_url_or_path = self._default_repo
branch = self._default_branch
if self.cfg.get("git", None) is not None:
if self.cfg.git.get("repo_url_or_path", None) is not None:
repo_url_or_path = str(self.cfg.git.get("repo_url_or_path"))
assert repo_url_or_path is not None, "`repo_url_or_path` must be defined when setting git config"
if self.cfg.git.get("token", None) is not None:
repo_url_or_path = self.insert_git_token(repo_url_or_path, self.cfg.git.token)
if self.cfg.git.get("branch", None) is not None:
branch = self.cfg.git.branch
if not self._use_local_repo():
# Remote repo, clone the repo url
script_text.extend(
[
"# For greater env stability, grab hostname from `hostname`",
"# https://sim.amazon.com/issues/P162624109",
'LAUNCHER_HOSTNAME="$(hostname)"',
"",
"mkdir -p $HOME/tmp",
'GIT_CLONE_DIR="$HOME/tmp/$LAUNCHER_HOSTNAME"',
"[[ -d $GIT_CLONE_DIR ]] && rm -rf $GIT_CLONE_DIR",
f"git clone {repo_url_or_path} $GIT_CLONE_DIR",
"GIT_CLONE_DIR=${GIT_CLONE_DIR}/",
"cd $GIT_CLONE_DIR",
# cache can lead to unexpected behavior when user clones
# the Adapter and modifies it
"rm -rf __pycache__",
]
)
else:
# simply cd to the directory for local repo
script_text.append(f"cd {repo_url_or_path}")
if branch is not None:
script_text.append(f"git checkout {branch}")
if self.cfg.get("git", None) is not None and self.cfg.git.get("commit", None) is not None:
script_text.append(f"git fetch origin {self.cfg.git.commit}")
script_text.append(f"git reset --hard {self.cfg.git.commit}")
if OmegaConf.select(self.cfg, "git.update_adapter", default=False):
script_text.append("\npip install . --force-reinstall --no-deps")
else:
script_text.append('GIT_CLONE_DIR=""')
if not OmegaConf.select(self.cfg, "training.run.model_type", default="").startswith("neuron"):
script_text.append("")
script_text.append("unset SLURM_NTASKS")
if get_container_type(self.cfg.get("container", None)) == "enroot" and self.cluster == "bcm":
if OmegaConf.select(self.cfg, "recipes.model.multi_modal", default=False):
transformers_upgrade_cmd = "pip install transformers==4.45.2"
script_text.append("")
script_text.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=False) == "deepseek_r1":
transformers_upgrade_cmd = "pip install transformers==4.48.2"
script_text.append("")
script_text.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=None) == "llama_v4":
transformers_upgrade_cmd = "pip install transformers==4.51.3"
script_text.append("")
script_text.append(transformers_upgrade_cmd)
script_text.append("")
script_text.append(self._make_custom_call_string(stage_cfg_path))
return "\n".join(script_text)
@staticmethod
def save_stage_hydra_config(stage_cfg: OmegaConf, job_path: JobPaths, cfg: OmegaConf) -> Path:
"""
Overriding from Training.save_stage_hydra_config, remove the addition of extra keys in k8s case
Interpolate and save hydra config file for current stage
:param OmegaConf stage_cfg: current stage's hydra configuration
:param JobPaths job_path: JobPaths object
:param OmegaConf cfg: base config for job
:return: path current stage's essential nemo scripts code
:rtype: Path
"""
_hydra_interpolation(stage_cfg)
cfg_save_path = job_path.config_file
omegaconf.OmegaConf.save(stage_cfg, cfg_save_path)
return cfg_save_path
def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]:
"""
Custom run stage which will invoke the entry script only
[TODO] Make this compatiable with NeMo flow as well
"""
if get_container_type(self.cfg.get("container", None)) == "docker":
logger.warning(
f"![WARNING] You're using docker container directly for slurm workload, we'd highly recommend using enroot instead"
)
command_groups = [
[
# Launch container first
f"srun -l bash {stage_cfg_path.parents[0]}/launch_docker_container.sh",
f"srun -l bash {stage_cfg_path.parents[0]}/docker_exec_script.sh",
]
]
# There will be only a single command group
# enroot or conda/venv, no need to launch docker container
else:
command_groups = [[f"bash {stage_cfg_path.parents[0]}/train_script.sh"]]
return command_groups
def create_sm_jobs_script(self, job_folder):
full_recipe_path = Path(job_folder) / "recipe.yaml"
OmegaConf.save(config=self.cfg.get("training"), f=full_recipe_path)
sm_jobs_config_path = Path(job_folder) / "sm_jobs_config.yaml"
OmegaConf.save(config=self.cfg.cluster.get("sm_jobs_config"), f=sm_jobs_config_path)
script_src = Path(ROOT_DIR) / "template" / "sm_jobs.py"
script_dst = Path(job_folder) / "launch.py"
shutil.copy(script_src, script_dst)
# FIXME: Remove transformers requirement when container is updated to include the version
# required to run multi-modal.
if OmegaConf.select(self.cfg, "recipes.model.multi_modal", default=False):
reqs_filename = Path(job_folder) / "requirements.txt"
with open(reqs_filename, "w") as reqs_file:
reqs_file.write(f"transformers=={TRANSFORMERS_VERSION_FOR_MULTIMODAL}")
def make_sm_jobs_command(self):
"""
Make submit command for sm_jobs cluster type.
"""
instance_type = self.cfg.get("instance_type")
if instance_type is None:
raise ValueError("Expected instance_type to be set with sm_jobs cluster type")
sm_jobs_config = self.cfg.cluster.get("sm_jobs_config")
if sm_jobs_config is None:
raise ValueError("Expected sm_jobs_config to be set with sm_jobs cluster type")
if sm_jobs_config.get("output_path") is None:
raise ValueError("Expected output_path to be set with sm_jobs cluster type")
command = f"python launch.py --job_name {self.job_name} --instance_type {instance_type}"
command_groups = [["pushd $(dirname -- $0)", command, "popd"]]
return command_groups
def run(self) -> str:
"""
Run current stage
"""
# Setup folders and datasets
self.setup_folder_and_data()
# Save stage hydra config
job_path = self.get_job_path()
# Identify if launching a trainium job
is_trainium = self.__class__.__name__ == "SMTrainingTrainiumRecipe"
is_custom = self.cfg.get("training_cfg") is not None
if not is_custom:
stage_cfg_path = SMTraining.save_stage_hydra_config(self.stage_cfg, job_path, self.cfg)
else:
stage_cfg_path = job_path.config_file
if self.cluster == "sm_jobs":
if is_custom:
raise RuntimeError("SM jobs launcher is not supported with custom training.")
cluster_parameters = {"job_name": self.job_name}
self.create_sm_jobs_script(job_path.folder)
command_groups = self.make_sm_jobs_command()
else:
# Make cluster parameters
cluster_parameters = self._make_cluster_parameters(self.cluster)
cluster_parameters["train_script_text"] = self._make_train_script_text(stage_cfg_path)
if get_container_type(self.cfg.container) == "docker":
cluster_parameters["launch_docker_container_text"] = self._make_launch_docker_container_text()
cluster_parameters["docker_exec_script_text"] = self._make_docker_exec_script_text(stage_cfg_path)
if get_container_type(self.cfg.container) != "enroot":
cluster_parameters.pop("container_mounts", None)
# if self.cfg.get("slurm_create_submission_file_only", None) is not None:
# cluster_parameters["slurm_create_submission_file_only"] = self.cfg.slurm_create_submission_file_only
cluster_parameters["hostfile"] = self._get_hostfile_location()
if is_trainium and self.get_cluster_type() == "bcm":
# Save temp training config file with string interpolations resolved so it can be
# copied into Neuron's package by the compute node(s) eventually selected by Slurm.
# NOTE: This file can't be removed. Multiple nodes may run the job asynchronously
# so there aren't any order guarantees nor an ideal moment to remove the file.
OmegaConf.save(self.cfg.training, self._temp_training_conf_file, True)
# Make k8s config file if necessary
if self.cluster == "k8s":
# The following two methods are overrides from the Training class. They require
# `template_root` but in our implementation we re-define it inside those methods.
# Therefore, `template_root` is just a sentinel so parent behavior is not broken.
sentinel_template_root = ""
self._make_k8s_spec_file(sentinel_template_root, cluster_parameters, job_path, stage_cfg_path)
self._copy_k8s_helm_chart(sentinel_template_root, job_path)
# k8s does not need command groups
command_groups = None
else:
command_groups = self.make_stage_command_groups(stage_cfg_path)
launcher = SMAutoLauncher(
folder=job_path.folder,
cluster=self.cluster,
**cluster_parameters,
)
job_id = launcher.launch(command_groups=command_groups)
if self.cluster == "bcm":
try:
self.telemetry.start(
self.cluster,
self.instance_type,
get_num_nodes(self.stage_cfg),
job_id=job_id,
container=self.cfg.get("container", None),
)
except:
pass
return job_id
def get_cluster_type(self) -> str:
"""
Get cluster type depending on whether configuration is custom or recipe
"""
# custom configurations have the `training_cfg` key
is_custom = self.cfg.get("training_cfg") is not None
cluster_type = None
if is_custom:
cluster_type = OmegaConf.select(self.cfg, "cluster.cluster_type")
else:
cluster_type = self.cfg.get("cluster_type")
if cluster_type is None:
raise AttributeError("`cluster_type` is not defined in the configuration file")
return cluster_type
def get_script_args_str(self, stage_cfg_path: Path) -> str:
"""
Based on https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L608
"""
if self.cluster == "k8s":
return "--config-path=/config --config-name=config.yaml"
return f"--config-path={stage_cfg_path.parents[0]} --config-name={stage_cfg_path.name}"
def insert_git_token(self, repo_url_or_path: str, token: str) -> str:
"""
Insert git token to git repo url. Currently only support github repo
"""
if "github.com" in repo_url_or_path:
splitted_url = repo_url_or_path.split("github.com", 1)
repo_url_or_path = splitted_url[0] + self.cfg.git.token + "@github.com" + splitted_url[1]
return repo_url_or_path
def _make_nemo_path_command(self) -> List[str]:
"""Extend nemo path to python path"""
# [TODO] clone the nemo/SFA/NxTT repo and handle point to the right path
return super()._make_nemo_path_command()
def _make_container_mounts_string(self) -> str:
"""
Make container mounting string based on hydra configurations
:return: container mounting string, e.g. "/path/to/A:/path/to/A,/path/to/B:/path/to/B,..."
:rtype: str
"""
def add_container_mounts(container_mounts):
mounts_str = ""
if container_mounts is not None:
assert isinstance(container_mounts, omegaconf.listconfig.ListConfig), "container_mounts must be a list."
for mount in container_mounts:
if mount is not None and isinstance(mount, str):
mounts_str += f",{mount}" if ":" in mount else f",{mount}:{mount}"
return mounts_str
cfg = self.cfg
base_results_dir = cfg.get("base_results_dir")
mounts_string = (
f"{self._launcher_scripts_path}:{self._launcher_scripts_path},{base_results_dir}:{base_results_dir}"
)
# mount volume only if inside a Hyperpod environment
hp_logs_dir = "/var/log/aws/clusters"
if Path(hp_logs_dir).is_dir():
mounts_string += f",{hp_logs_dir}:{hp_logs_dir}"
"""Start of SM change"""
container_mounts = cfg.cluster.get("container_mounts")
"""End of SM change"""
mounts_string += add_container_mounts(container_mounts)
# https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L264
# We do not have data dir for custom launching
mounts_string = mounts_string.replace(",None:None", "")
if self._use_local_repo():
mounts_string += f",{self.cfg.git.repo_url_or_path}:{self.cfg.git.repo_url_or_path}"
return mounts_string
def generate_default_k8s_value_template(self, template_root, cluster_parameters, stage_cfg_path=None):
"""
Setting the general k8s configs that will be applicable for all device types and training methods
"""
with open(template_root / "values.yaml") as value_file:
values_template = OmegaConf.load(value_file)
values_template.image.trainingImage = cluster_parameters["container_image"]
values_template.trainingConfig.jobName = self.stage_cfg.run.name
# Cluster configs
values_template.trainingConfig.numEFADevices = self.num_efa_devices
if "pullPolicy" in cluster_parameters:
values_template.image.pullPolicy = cluster_parameters["pullPolicy"]
if "env_vars" in cluster_parameters:
values_template.trainingConfig.envVars = cluster_parameters["env_vars"]
if "restartPolicy" in cluster_parameters:
values_template.trainingConfig.restartPolicy = cluster_parameters["restartPolicy"]
if "cleanPodPolicy" in cluster_parameters:
values_template.trainingConfig.cleanPodPolicy = cluster_parameters["cleanPodPolicy"]
if "persistent_volume_claims" in cluster_parameters:
values_template.trainingConfig.persistentVolumeClaims = cluster_parameters["persistent_volume_claims"]
if "volumes" in cluster_parameters:
values_template.trainingConfig.volumes = cluster_parameters["volumes"]
if cluster_parameters.get("namespace", None) is not None:
values_template.trainingConfig.namespace = cluster_parameters["namespace"]
if cluster_parameters.get("annotations", None) is not None:
values_template.trainingConfig.annotations = cluster_parameters["annotations"]
if cluster_parameters.get("priority_class_name", None) is not None:
values_template.trainingConfig.priorityClassName = cluster_parameters["priority_class_name"]
if cluster_parameters.get("service_account_name") is not None:
values_template.trainingConfig.serviceAccountName = cluster_parameters["service_account_name"]
if cluster_parameters.get("custom_labels", None) is not None:
values_template.trainingConfig.customLabels = cluster_parameters["custom_labels"]
if cluster_parameters.get("label_selector", None) is not None:
values_template.trainingConfig.labelSelector = cluster_parameters["label_selector"]
values_template.trainingConfig.compile = OmegaConf.select(self.cfg, "recipes.run.compile", default=0)
if self._default_repo is not None:
values_template.trainingConfig.git.repo_url_or_path = self._default_repo
if self._default_branch is not None:
values_template.trainingConfig.git.branch = self._default_branch
# Git configs
if self.cfg.get("git", None) is not None:
if self.cfg.git.get("repo_url_or_path", None) is not None:
repo_url_or_path = str(self.cfg.git.repo_url_or_path)
# We only support to use local repo path for slurm, bcm is nemo launcher version of slurm cluster
if not (repo_url_or_path.startswith("http") or repo_url_or_path.startswith("codecommit::")):
raise ValueError("local git repo path is only supported for slurm based cluster")
if self.cfg.git.get("token", None) is not None:
repo_url_or_path = self.insert_git_token(repo_url_or_path, self.cfg.git.token)
values_template.trainingConfig.git.repo_url_or_path = repo_url_or_path
if self.cfg.git.get("branch", None) is not None:
values_template.trainingConfig.git.branch = self.cfg.git.branch
if self.cfg.git.get("commit", None) is not None:
values_template.trainingConfig.git.commit = self.cfg.git.commit
if self.cfg.git.get("update_adapter", None) is not None:
values_template.trainingConfig.git.update_adapter = self.cfg.git.update_adapter
values_template.trainingConfig.device = self.device
values_template.trainingConfig.scriptArgs = self.get_script_args_str(stage_cfg_path)
values_template.trainingConfig.pre_script = self.stage_cfg.get("pre_script", [])
values_template.trainingConfig.post_script = self.stage_cfg.get("post_script", [])
return values_template
def write_value_template(self, values_template, job_path):
"""
Write the value template into disk
"""
k8s_template_path = job_path.folder
k8s_template_file = Path(k8s_template_path / "k8s_template" / "values.yaml")
k8s_template_file.parent.mkdir(parents=True, exist_ok=True)
conf = OmegaConf.create(values_template)
OmegaConf.save(conf, k8s_template_file)
def update_stage_specific_k8s_values(self, values_template):
"""
Update the k8s configs that is related to the current stage
"""
values_template.trainingConfig.ntasksPerNode = self.stage_cfg.trainer.devices
values_template.trainingConfig.nodes = self.stage_cfg.trainer.num_nodes
choice_model_type, _ = self.get_stage_config_choice()
if self.cfg.git.get("entry_script", None) is not None:
# Override with entry script provided by the customer
values_template.trainingConfig.scriptPath = self.cfg.git.entry_script
else:
values_template.trainingConfig.scriptPath = str(self._entry_script_path)
if OmegaConf.select(self.cfg, "recipes.model.multi_modal", default=False):
transformers_upgrade_cmd = "pip install transformers==4.45.2"
values_template.trainingConfig.pre_script.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=False) == "deepseek_r1":
transformers_upgrade_cmd = "pip install transformers==4.48.2"
values_template.trainingConfig.pre_script.append(transformers_upgrade_cmd)
if OmegaConf.select(self.cfg, "recipes.model.model_type", default=None) == "llama_v4":
transformers_upgrade_cmd = "pip install transformers==4.51.3"
values_template.trainingConfig.pre_script.append(transformers_upgrade_cmd)
return values_template
# @override - available in Python 3.12 - `template_root` is required by parent implementation
def _make_k8s_spec_file(
self, template_root: str, cluster_parameters: Dict, job_path: JobPaths, stage_cfg_path=None
):
"""
Referring from https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L669
Break the function into 3 parts so we can easily override in different stages
- Create general k8s configs that will be applicable for all device types and training methods
- Update stage specific k8s configs
- Write k8s configs to disk as value.yaml, which will be consumed by helm
"""
# Need to override the template_root to use our templates
# [TODO] Currently hard-code it to do the stage as training
template_root: Path = ROOT_DIR / "launcher/nemo/k8s_templates/training"
values_template = self.generate_default_k8s_value_template(template_root, cluster_parameters, stage_cfg_path)
values_template = self.update_stage_specific_k8s_values(values_template)
self.write_value_template(values_template, job_path)
def _copy_k8s_helm_helper_configs(self, src_training_dir: Path, job_path: JobPaths):
"""
Copy helper Helm files into results directory
"""
# copy the Trainium and GPU config files
gpu_config = "train-script-gpu.yaml"
trn_config = "train-script-trn.yaml"
templates_path = Path(job_path.folder / "k8s_template" / "templates")
shutil.copy2(str(src_training_dir / gpu_config), str(templates_path / gpu_config))
shutil.copy2(str(src_training_dir / trn_config), str(templates_path / trn_config))
# @override - available in Python 3.12 - `template_root` is required by parent implementation
def _copy_k8s_helm_chart(self, template_root: str, job_path: JobPaths):
# Need to override the template_root to use our templates
# [TODO] Currently hard-code it to do the stage as training
src_training_dir = ROOT_DIR / "launcher/nemo/k8s_templates/training"
super()._copy_k8s_helm_chart(str(src_training_dir), job_path)
self._copy_k8s_helm_helper_configs(src_training_dir, job_path)
def get_env_vars(self) -> Dict:
"""
Set up dictionary for environment variables
By default injecting the EFA env variable when doing multi-node training
The environment variables from hydra config will be set inside the job scripts.
For Example:
Set `env_vars.NVTE_BIAS_DROPOUT_FUSION=1` while calling nemo_launcherlauncher-scripts,
`NVTE_BIAS_DROPOUT_FUSION=1` will be set while running the job.
:return: a dictionary of env vars while running the job.
:rtype: Dict
"""
env_vars = super().get_env_vars()
stage_cfg = self.stage_cfg
nodes = get_num_nodes(stage_cfg)
if int(nodes) > 1:
env_vars = set_multinode_envs(env_vars, self.instance_type)
return env_vars
class SMCustomTraining(SMTraining):
"""
Base stage for the custom training on Sagemaker.
"""
@property
def _entry_script_path(self) -> Path:
return Path(self.stage_cfg.entry_script)
def setup_stage_vars(self, cfg):
"""Setup the stage vars, i.e. stage name and stage cfg"""
self.stage_name = "custom_training_sm"
self.stage_cfg = cfg.get("training_cfg")
def get_script_args_str(self, stage_cfg_path=None):
"""
Getting all script args and make it as a str
"""
arg_str = []
if self.stage_cfg.get("script_args", None) is not None:
# script_args will be a list of dict which has key of arg_name and value of arg_value
for arg in list(self.stage_cfg.script_args):
for key, val in arg.items():
arg_str.append(f"{key} {val} ")
return "".join(arg_str)
def update_stage_specific_k8s_values(self, values_template):
"""
Custom training specifc k8s values
"""
values_template.trainingConfig.ntasksPerNode = get_ntasks_per_node(self.stage_cfg)
values_template.trainingConfig.nodes = get_num_nodes(self.stage_cfg)
values_template.trainingConfig.scriptPath = self.stage_cfg.entry_script
values_template.trainingConfig.customScript = True
return values_template
def _copy_k8s_helm_chart(self, template_root: str, job_path: JobPaths):
# Need to override the template_root to use our templates
# [TODO] Currently hard-code it to do the stage as training
src_training_dir = ROOT_DIR / "launcher/nemo/k8s_templates/training"
# For custom run, there is no need for training config files
# Only creating training.yaml, Chart.yaml
template_file = str(src_training_dir / "training.yaml")
chart_file = str(src_training_dir / "Chart.yaml")
training_path = Path(job_path.folder / "k8s_template" / "templates" / "training.yaml")
training_path.parent.mkdir(parents=True, exist_ok=True)
chart_path = Path(job_path.folder / "k8s_template" / "Chart.yaml")
shutil.copy2(template_file, training_path)
shutil.copy2(chart_file, chart_path)
self._copy_k8s_helm_helper_configs(src_training_dir, job_path)
def _make_cluster_parameters(self, cluster: str) -> Dict:
"""
Make a cluster-specific parameters for jobs on different clusters.
:param str cluster: i.e. `bcm`, `bcp`, `interactive`, etc.
:return: a dictionary of cluster parameters, e.g. `ntasks_per_node`
:rtype: Dict
"""
with omegaconf.open_dict(self.cfg):
# Patch self.cfg.cluster to align with
# https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L312
origin_cluster = self.cfg.cluster
self.cfg.cluster = self.cfg.cluster.cluster_config
cluster_parameters = super()._make_cluster_parameters(cluster)
cluster_type = origin_cluster.get("cluster_type")
if cluster_type == "k8s":
env_vars = cluster_parameters.get("env_vars")
if env_vars and "SLURM_NTASKS_PER_NODE" in env_vars:
env_vars.pop("SLURM_NTASKS_PER_NODE")
self.cfg.cluster = origin_cluster
return cluster_parameters
class SMCustomTrainingGPU(SMCustomTraining):
"""
Stage for training with custom stage on GPU
"""
@property
def _cuda_visible_devices(self) -> str:
ntasks_per_node = get_ntasks_per_node(self.stage_cfg)
if ntasks_per_node is None:
ntasks_per_node = 8
return (
"CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7"
if ntasks_per_node == 8
else f"CUDA_VISIBLE_DEVICES={','.join(map(str, range(ntasks_per_node)))}"
)
@property
def _set_ln_sm_margin(self) -> str:
return ""
@property
def _skip_ag_overlap(self) -> str:
return ""
class SMCustomTrainingCPU(SMCustomTrainingGPU):
"""
Stage for custom training on CPU
"""
def __init__(self, cfg):
super().__init__(cfg)
self.device = "cpu"
@property
def _cuda_visible_devices(self) -> str:
return ""
class SMCustomTrainingTrainium(SMCustomTraining):
"""
Stage for custom training on Trainium
"""
def __init__(self, cfg):
super().__init__(cfg)
self.device = "trainium"
def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]:
"""
Make the command groups for current stage
Command groups is a list of command group. A command group is defined as:
0. Command group is a list of command strings
1. Each command group occupies one bcprun, srun or bash
2. Each command group eventually has multiple commands connected by ";"
:param Path stage_cfg_path: path to interpolated and saved configuration
:return: command groups for current stage
:rtype: List[List[str]]
"""
def update_stage_specific_k8s_values(self, values_template):
"""
Custom training specifc k8s values for trainum
"""
super().update_stage_specific_k8s_values(values_template)
values_template.trainingConfig.numNeuronDevices = get_num_accelerator_devices(self.instance_type)
return values_template