slurm_launcher.py (378 lines of code) (raw):
#!/usr/bin/env python3
"""
Nanotron Slurm Launcher
This script simplifies launching multi-node Nanotron training jobs on Slurm clusters.
It handles configuration generation, resource allocation, and job submission.
Usage:
python slurm_launcher.py --run_name my_experiment --nodes 4 [other options]
The script will:
1. Generate a Nanotron config based on your parameters
2. Create a Slurm job script with appropriate settings
3. Submit the job to the Slurm scheduler
4. Save configurations for reproducibility
"""
import argparse
import logging
import os
import subprocess
import tempfile
import time
from datetime import datetime
from typing import Optional
# Nanotron imports
from nanotron.config import (
AdamWOptimizerArgs,
CheckpointsArgs,
Config,
DataArgs,
DatasetStageArgs,
GeneralArgs,
LlamaConfig,
LoggingArgs,
LRSchedulerArgs,
ModelArgs,
NanosetDatasetsArgs, # noqa
OptimizerArgs,
ParallelismArgs,
PretrainDatasetsArgs, # noqa
ProfilerArgs,
RandomInit,
TokenizerArgs,
TokensArgs,
)
from nanotron.logging import human_format
logger = logging.getLogger(__name__)
# =============================================
# CONFIGURATION SECTION - MODIFY AS NEEDED
# =============================================
# Default paths - override with command line arguments if needed
DEFAULT_CONFIGS_PATH = "logs/configs"
DEFAULT_SLURM_LOGS_PATH = "logs/slurm_logs"
DEFAULT_SLURM_SCRIPTS_DIR = "logs/slurm_scripts"
DEFAULT_CHECKPOINTS_PATH = "checkpoints"
DEFAULT_RUN_TRAIN_SCRIPT = "run_train.py"
# Default model sizes - predefined configurations for common model sizes
MODEL_SIZES = {
# (layers, hidden, heads, kv_heads, ffn_size)
"160m": (12, 768, 12, 12, 3072), # ~160M params
"410m": (24, 1024, 16, 16, 4096), # ~410M params
# Small to medium models
"1b": (16, 2048, 16, 16, 5632), # ~1B params
"3b": (28, 3072, 32, 32, 8192), # ~3B params
# Standard sizes
"7b": (32, 4096, 32, 32, 11008), # ~7B params
"13b": (40, 5120, 40, 40, 13824), # ~13B params
# Large models
"30b": (60, 6656, 52, 52, 17920), # ~30B params
"70b": (80, 8192, 64, 8, 28672), # ~70B params (MQA)
# Custom model
"custom": (12, 192, 4, 4, 768),
}
def parse_args():
"""Parse command line arguments for the Slurm launcher."""
parser = argparse.ArgumentParser(
description="Nanotron Slurm Launcher", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Required arguments
parser.add_argument("--run", type=str, default="nanotron", help="Name for this experiment run")
# Slurm job configuration
slurm_group = parser.add_argument_group("Slurm Configuration")
slurm_group.add_argument("--gpus_per_node", type=int, default=8, help="Number of GPUs per node")
slurm_group.add_argument("--partition", type=str, default="hopper-prod", help="Slurm partition to use")
slurm_group.add_argument("--qos", type=str, default="normal", help="Slurm QOS to use")
slurm_group.add_argument("--time_limit", type=str, default=None, help="Time limit for the job (HH:MM:SS)")
slurm_group.add_argument("--email", type=str, default=None, help="Email for job notifications")
slurm_group.add_argument("--tmp_dir", type=str, default="/tmp", help="Temporary directory on compute nodes")
slurm_group.add_argument("--pre_launch_commands", type=str, default="", help="Commands to run before job launch")
slurm_group.add_argument("--extra_env", type=str, default="", help="Additional environment variables")
slurm_group.add_argument("--bench", type=str, default="", help="Benchmark csv path")
# Config file
parser.add_argument(
"--config",
type=str,
default=None,
help="Path to the Nanotron config file. If not provided, a config will be created automatically.",
)
# Model configuration
model_group = parser.add_argument_group("Model Configuration")
model_group.add_argument(
"--model",
type=str,
default="custom",
choices=MODEL_SIZES.keys(),
help="Predefined model size",
)
model_group.add_argument("--hidden-size", type=int, default=None, help="Hidden size (overrides model)")
model_group.add_argument("--intermediate-size", type=int, default=None, help="Intermediate size (overrides model)")
model_group.add_argument("--num-layers", type=int, default=None, help="Number of layers (overrides model)")
model_group.add_argument("--num-heads", type=int, default=None, help="Number of attention heads (overrides model)")
model_group.add_argument("--num-kv-heads", type=int, default=None, help="Number of KV heads (overrides model)")
model_group.add_argument("--vocab-size", type=int, default=65536, help="Vocabulary size (overrides model)")
model_group.add_argument("--seq", type=int, default=4096, help="Maximum sequence length")
# Training configuration
training_group = parser.add_argument_group("Training Configuration")
training_group.add_argument("--seed", type=int, default=42, help="Random seed for reproducibility")
training_group.add_argument("--steps", type=int, default=10000, help="Number of training steps")
training_group.add_argument("--mbs", type=int, default=2, help="Micro batch size")
training_group.add_argument("--acc", type=int, default=8, help="Gradient accumulation steps")
training_group.add_argument("--learning-rate", type=float, default=3e-4, help="Peak learning rate")
training_group.add_argument("--min-lr", type=float, default=3e-5, help="Minimum learning rate for decay")
training_group.add_argument("--weight-decay", type=float, default=0.01, help="Weight decay")
training_group.add_argument("--grad-clip", type=float, default=1.0, help="Gradient clipping")
training_group.add_argument("--warmup-steps", type=int, default=1000, help="Learning rate warmup steps")
# Parallelism strategy
parallel_group = parser.add_argument_group("Parallelism Configuration")
parallel_group.add_argument("--dp", type=int, default=8, help="Data parallelism (DP) degree")
parallel_group.add_argument("--pp", type=int, default=1, help="Pipeline parallelism (PP) degree")
parallel_group.add_argument("--tp", type=int, default=2, help="Tensor parallelism (TP) degree")
parallel_group.add_argument("--cp", type=int, default=1, help="Context parallelism degree")
parallel_group.add_argument("--ep", type=int, default=1, help="Expert parallelism degree")
parallel_group.add_argument("--zero", type=int, default=0, choices=[0, 1], help="ZeRO stage")
# Dataset configuration
data_group = parser.add_argument_group("Dataset Configuration")
data_group.add_argument("--dataset", type=str, default=None, help="Hugging Face dataset name or path")
data_group.add_argument("--text-column", type=str, default="text", help="Column name for text in the dataset")
data_group.add_argument(
"--tokenizer", type=str, default="robot-test/dummy-tokenizer-wordlevel", help="Tokenizer name or path"
)
# File paths
paths_group = parser.add_argument_group("File Paths")
paths_group.add_argument("--project", type=str, default="nanotron", help="Project name for logging")
paths_group.add_argument(
"--configs-path", type=str, default=DEFAULT_CONFIGS_PATH, help="Directory to save configuration files"
)
paths_group.add_argument(
"--slurm-logs-path", type=str, default=DEFAULT_SLURM_LOGS_PATH, help="Directory for Slurm output logs"
)
paths_group.add_argument(
"--checkpoints-path",
type=str,
default=DEFAULT_CHECKPOINTS_PATH,
help="Base directory for saving model checkpoints",
)
slurm_group.add_argument(
"--run-train-script",
type=str,
default=DEFAULT_RUN_TRAIN_SCRIPT,
help="Path to the training script (default: run_train.py)",
)
slurm_group.add_argument(
"--slurm-scripts-dir",
type=str,
default=DEFAULT_SLURM_SCRIPTS_DIR,
help="Directory to save generated Slurm scripts (set to None to disable)",
)
paths_group.add_argument(
"--save-interval", type=int, default=1000, help="Interval for saving checkpoints (in steps)"
)
paths_group.add_argument("--save-initial-state", action="store_true", help="Save initial state")
# Logging configuration
logging_group = parser.add_argument_group("Logging Configuration")
logging_group.add_argument("--enable-wandb", action="store_true", help="Enable logging to Weights & Biases")
logging_group.add_argument(
"--profiler_export_path",
type=str,
default=None,
help="Path to export the profiler tensorboard data. Use `tensorboard --logdir <path>` to view.",
)
logging_group.add_argument("--log-lvl", type=str, default="info", help="Log level")
logging_group.add_argument("--no-sanity", action="store_true", help="Ignore sanity checks")
# Execution control
parser.add_argument("--dry-run", action="store_true", help="Generate configs but don't submit job")
parser.add_argument("--show-logs", action="store_true", help="Show output of the job as it runs")
return parser.parse_args()
def generate_model_config(
model_size: str = "custom",
hidden_size: Optional[int] = None,
intermediate_size: Optional[int] = None,
num_hidden_layers: Optional[int] = None,
num_attention_heads: Optional[int] = None,
num_key_value_heads: Optional[int] = None,
vocab_size: Optional[int] = None,
max_position_embeddings: int = 4096,
) -> LlamaConfig:
"""
Generate a model configuration based on predefined sizes or custom parameters.
Args:
model_size: Predefined size ('tiny', 'small', 'base', 'large')
hidden_size: Custom hidden size (overrides model_size)
intermediate_size: Custom intermediate size (overrides model_size)
num_hidden_layers: Custom number of layers (overrides model_size)
num_attention_heads: Custom number of attention heads (overrides model_size)
num_key_value_heads: Custom number of KV heads (overrides model_size)
vocab_size: Custom vocabulary size (overrides model_size)
max_position_embeddings: Maximum sequence length
Returns:
LlamaConfig object with the specified parameters
"""
# Start with the base configuration for the requested size
config_params = MODEL_SIZES.get(model_size, MODEL_SIZES["custom"])
config_params = {
"num_hidden_layers": config_params[0],
"hidden_size": config_params[1],
"num_attention_heads": config_params[2],
"num_key_value_heads": config_params[3],
"intermediate_size": config_params[4],
}
# Override with any explicitly specified parameters
if hidden_size is not None:
config_params["hidden_size"] = hidden_size
if intermediate_size is not None:
config_params["intermediate_size"] = intermediate_size
if num_hidden_layers is not None:
config_params["num_hidden_layers"] = num_hidden_layers
if num_attention_heads is not None:
config_params["num_attention_heads"] = num_attention_heads
if num_key_value_heads is not None:
config_params["num_key_value_heads"] = num_key_value_heads
if vocab_size is not None:
config_params["vocab_size"] = vocab_size
# Create the model configuration
model_config = LlamaConfig(
bos_token_id=1,
eos_token_id=2,
hidden_act="silu",
initializer_range=0.02,
max_position_embeddings=max_position_embeddings,
pretraining_tp=1,
rms_norm_eps=1e-05,
rope_scaling=None,
tie_word_embeddings=False,
use_cache=True,
**config_params,
)
return model_config
def launch_slurm_job(launch_file_contents: str, *args) -> str:
"""
Save a sbatch script to a temporary file and submit it to Slurm.
Args:
launch_file_contents: Contents of the sbatch script
*args: Additional arguments to pass to the sbatch command
Returns:
Job ID of the submitted Slurm job
"""
with tempfile.NamedTemporaryFile("w") as f:
f.write(launch_file_contents)
f.flush()
return subprocess.check_output(["sbatch", *args, f.name]).decode("utf-8").split()[-1]
def create_nanotron_config(args) -> Config:
"""
Create a Nanotron configuration object based on the provided arguments.
Args:
args: Command line arguments
Returns:
Nanotron Config object
"""
# Generate model configuration
model_config = generate_model_config(
model_size=args.model,
hidden_size=args.hidden_size,
intermediate_size=args.intermediate_size,
num_hidden_layers=args.num_layers,
num_attention_heads=args.num_heads,
num_key_value_heads=args.num_kv_heads,
vocab_size=args.vocab_size,
max_position_embeddings=args.seq,
)
# Calculate number of parameters for logging
num_params = human_format(
model_config.vocab_size * model_config.hidden_size * 2
+ model_config.num_hidden_layers
* (
3 * model_config.hidden_size * model_config.intermediate_size
+ 4 * model_config.hidden_size * model_config.hidden_size
)
).replace(".", "p")
print(f"Model has {num_params} parameters")
# Use user-provided parallelism directly
parallelism = ParallelismArgs(
dp=args.dp,
pp=args.pp,
tp=args.tp,
context_parallel_size=args.cp,
expert_parallel_size=args.ep,
pp_engine="1f1b",
tp_mode="REDUCE_SCATTER",
tp_linear_async_communication=True,
recompute_layer=False,
)
# Define tokens configuration
tokens = TokensArgs(
sequence_length=args.seq,
train_steps=args.steps,
micro_batch_size=args.mbs,
batch_accumulation_per_replica=args.acc,
)
# Calculate global batch size for logging
gbs = (
parallelism.dp
* tokens.batch_accumulation_per_replica
* tokens.micro_batch_size
* tokens.sequence_length
* parallelism.context_parallel_size
* parallelism.expert_parallel_size
)
total_tokens = gbs * args.steps
print(f"GBS: {(gbs)/1e6:.2f}M, total training tokens: {(total_tokens)/1e9:.2f}B")
# Configure learning rate schedule
lr_scheduler = LRSchedulerArgs(
learning_rate=args.learning_rate,
lr_warmup_steps=args.warmup_steps,
lr_warmup_style="linear",
lr_decay_style="cosine",
min_decay_lr=args.min_lr,
)
# Configure optimizer
optimizer = OptimizerArgs(
zero_stage=args.zero,
weight_decay=args.weight_decay,
clip_grad=args.grad_clip,
accumulate_grad_in_fp32=True,
learning_rate_scheduler=lr_scheduler,
optimizer_factory=AdamWOptimizerArgs(
adam_eps=1e-08,
adam_beta1=0.9,
adam_beta2=0.95,
torch_adam_is_fused=True,
),
)
# Configure datasets
data_stages = [
DatasetStageArgs(
name="Stable Training Stage",
start_training_step=1,
data=DataArgs(
# For pretraining:
# dataset=PretrainDatasetsArgs(
# hf_dataset_or_datasets=args.dataset,
# text_column_name=args.text_column,
# ),
# When using a Nanoset, we need to specify the vocab size of the tokenizer used to tokenize the dataset or larger
dataset=NanosetDatasetsArgs(
dataset_folder="/fsx/loubna/tokenized_for_exps/mcf-dataset", # 1.4T tokens
),
# For SFT (uncomment to use):
# dataset=SFTDatasetsArgs(
# hf_dataset_or_datasets=args.dataset,
# hf_dataset_splits="train",
# debug_max_samples=1000,
# ),
seed=args.seed,
),
),
]
# Configure checkpointing
os.makedirs(args.checkpoints_path, exist_ok=True)
checkpoints = CheckpointsArgs(
checkpoints_path=os.path.join(args.checkpoints_path, args.run),
checkpoint_interval=args.save_interval,
save_initial_state=args.save_initial_state,
)
# Create the final config
config = Config(
general=GeneralArgs(
project=args.project,
run=args.run,
seed=args.seed,
ignore_sanity_checks=args.no_sanity,
benchmark_csv_path=args.bench,
),
checkpoints=checkpoints,
parallelism=parallelism,
model=ModelArgs(init_method=RandomInit(std=0.025), model_config=model_config),
tokenizer=TokenizerArgs(args.tokenizer),
optimizer=optimizer,
logging=LoggingArgs(log_level=args.log_lvl, log_level_replica=args.log_lvl, iteration_step_info_interval=1),
tokens=tokens,
data_stages=data_stages,
profiler=ProfilerArgs(profiler_export_path=args.profiler_export_path)
if args.profiler_export_path is not None
else None,
)
return config
def create_slurm_script(
config_path: str,
args,
dp: int,
pp: int,
tp: int,
cp: int,
ep: int,
run_train_script: str = "run_train.py",
) -> str:
"""
Create a Slurm job submission script.
Args:
config_path: Path to the Nanotron config YAML file
args: Command line arguments
Returns:
Contents of the Slurm script as a string
"""
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
logs_path = os.path.join(args.slurm_logs_path, args.run.replace(" ", "_"))
os.makedirs(logs_path, exist_ok=True)
gpus_per_node = min(args.gpus_per_node, dp * pp * tp * cp * ep)
assert dp * pp * tp * cp * ep % gpus_per_node == 0
nodes = dp * pp * tp * cp * ep // gpus_per_node
# Ensure config_path is a full path
if not os.path.isabs(config_path):
config_path = os.path.abspath(config_path)
script = f"""#!/bin/bash
#SBATCH --job-name={args.run}
#SBATCH --nodes={nodes}
#SBATCH --ntasks-per-node=1 # crucial - only 1 task per dist per node!
#SBATCH --cpus-per-task=60 # CPU cores per task
#SBATCH --exclusive # Exclusive use of nodes
#SBATCH --gpus-per-node={gpus_per_node}
#SBATCH --partition={args.partition}
#SBATCH --output={logs_path}/{timestamp}-%x-%j.out
#SBATCH --qos={args.qos}
#SBATCH --wait-all-nodes=1 # fail if any node is not ready
{f"#SBATCH --time={args.time_limit}" if args.time_limit else ""}
"""
if args.email:
script += f"""#SBATCH --mail-type=BEGIN,END,FAIL
#SBATCH --mail-user={args.email}
"""
script += f"""
set -x -e
echo "START TIME: $(date)"
secs_to_human() {{
echo "$(( ${{1}} / 3600 )):$(( (${{1}} / 60) % 60 )):$(( ${{1}} % 60 ))"
}}
start=$(date +%s)
echo "$(date -d @${{start}} "+%Y-%m-%d %H:%M:%S"): ${{SLURM_JOB_NAME}} start id=${{SLURM_JOB_ID}}\\n"
# Get the actual slurm script path from the environment
echo "Slurm script path: $(scontrol show job $SLURM_JOB_ID | grep -oP 'Command=\\K[^ ]+')"
# SLURM setup
export HOSTNAMES=`scontrol show hostnames "$SLURM_JOB_NODELIST"`
export MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)
export MASTER_PORT=6000
export COUNT_NODE=`scontrol show hostnames "$SLURM_JOB_NODELIST" | wc -l`
export TMPDIR={args.tmp_dir}
export CUDA_DEVICE_MAX_CONNECTIONS=1
{args.extra_env}
echo "Running on $COUNT_NODE nodes: $HOSTNAMES"
# Calculate total number of processes
export NNODES=$SLURM_NNODES
export GPUS_PER_NODE={gpus_per_node}
export WORLD_SIZE=$(($NNODES * $GPUS_PER_NODE))
# Set some environment variables for better distributed training
# export NCCL_DEBUG=WARN # INFO, WARN
# export NCCL_DEBUG_SUBSYS=ALL
# export CUDA_LAUNCH_BLOCKING=1
# export TORCH_NCCL_ASYNC_ERROR_HANDLING=1
# export TORCH_DISTRIBUTED_DEBUG=DETAIL
# Nanotron specific
{"export NANOTRON_BENCHMARK=1" if args.bench else ""}
{"# " if args.enable_wandb else ""}export WANDB_MODE=disabled
# export ENABLE_TIMERS=1
# export DEBUG_CPU=1
CMD="{run_train_script} --config-file {config_path}"
# echo nvcc version and assert we use cuda 12.4
echo "NVCC version: $(nvcc --version)"
if ! nvcc --version | grep -q "12.4"; then
echo "ERROR: CUDA 12.4 is required to avoid dataloader issues"
exit 1
fi
# Log system information
echo "PyTorch version: $(python -c 'import torch; print(torch.__version__)')"
echo "Is debug build: $(python -c 'import torch; print(torch.version.debug)')"
echo "CUDA used to build PyTorch: $(python -c 'import torch; print(torch.version.cuda)')"
echo "ROCM used to build PyTorch: $(python -c 'import torch; print(torch.version.hip)')"
echo "PATH: $PATH"
# Log environment variables
echo "Environment variables:"
printenv | sort
# Log python path
echo "Python path: $(which python)"
# Log torchrun path
echo "Torchrun path: $(which torchrun)"
# Log installed Python packages
echo "Installed Python packages:"
python -m pip freeze
# Log GPU information
nvidia-smi
LAUNCHER="torchrun \\
--nproc_per_node {gpus_per_node} \\
--nnodes $COUNT_NODE \\
--rdzv_backend c10d \\
--rdzv_endpoint $MASTER_ADDR:$MASTER_PORT \\
--max_restarts 0 \\
--tee 3 \\
"
{args.pre_launch_commands}
srun -u bash -c "$LAUNCHER $CMD"
echo "END TIME: $(date)"
end=$(date +%s)
elapsed=$((end - start))
echo "Total training time: $(secs_to_human $elapsed)"
"""
return script
def tail_output_file(output_file: str):
"""Tail the output file when available."""
while not os.path.exists(output_file):
time.sleep(1)
with open(output_file, "r"):
subprocess.run(["tail", "-f", output_file])
def main():
"""Main entry point for the Slurm launcher."""
args = parse_args()
# Create directories if they don't exist
os.makedirs(args.configs_path, exist_ok=True)
os.makedirs(args.slurm_logs_path, exist_ok=True)
# Create Nanotron config if not provided
if args.config is None:
config = create_nanotron_config(args)
dp, pp, tp, cp, ep = (args.dp, args.pp, args.tp, args.cp, args.ep)
else:
print(f"🔍 Loading config from {args.config}")
config = Config.load_from_yaml(args.config)
dp = config.parallelism.dp
pp = config.parallelism.pp
tp = config.parallelism.tp
cp = config.parallelism.context_parallel_size
ep = config.parallelism.expert_parallel_size
# bench
if args.bench:
config.general.benchmark_csv_path = args.bench
# Save config to YAML file
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
run_name = args.run.replace(" ", "_")
config_dir = os.path.join(args.configs_path, run_name)
os.makedirs(config_dir, exist_ok=True)
config_path = os.path.join(config_dir, f"{timestamp}-{run_name}.yaml")
config.save_as_yaml(config_path)
print(f"💾 Config saved to {config_path}")
config.print_config_details()
# Create Slurm script
slurm_script = create_slurm_script(config_path, args, dp, pp, tp, cp, ep, args.run_train_script)
# Save Slurm script if requested
if args.slurm_scripts_dir is not None:
os.makedirs(args.slurm_scripts_dir, exist_ok=True)
slurm_script_path = os.path.join(args.slurm_scripts_dir, f"{timestamp}-{run_name}.sh")
with open(slurm_script_path, "w") as f:
f.write(slurm_script)
print(f"💾 Slurm script saved to {slurm_script_path}")
# Either submit the job or just print the script (dry run)
if args.dry_run:
print("DRY RUN - Job script:")
print(slurm_script)
print(f"🔍 Would submit job with config from {config_path}")
else:
job_id = launch_slurm_job(slurm_script)
print(f"🚀 Slurm job submitted with JOBID: {job_id}")
print(
f"🔍 Logs will be available at: {os.path.join(args.slurm_logs_path, run_name, f'{timestamp}-{run_name}-{job_id}.out')}"
)
# Tail output file when available
if args.show_logs:
tail_output_file(os.path.join(args.slurm_logs_path, run_name, f"{timestamp}-{run_name}-{job_id}.out"))
return 0
if __name__ == "__main__":
main()