launcher/nemo/slurm_launcher.py (93 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import shutil
from pathlib import Path
from typing import Any, List, Union
import nemo_launcher.utils.job_utils as job_utils
from nemo_launcher.core.launchers import SlurmLauncher
from nemo_launcher.core.logger import logger
NEMO_LAUNCHER_DEBUG = os.getenv("NEMO_LAUNCHER_DEBUG", "False").lower() in (
"true",
"t",
"1",
)
class SMJobPaths(job_utils.JobPaths):
"""
Our launcher contains an extra entry script called train_script.sh
This class is used to specify its path
"""
@property
def train_script_file(self) -> Path:
return self._folder / f"train_script.sh"
@property
def launch_docker_container_file(self) -> Path:
return self._folder / f"launch_docker_container.sh"
@property
def docker_exec_script_file(self) -> Path:
return self._folder / f"docker_exec_script.sh"
class SMSlurmLauncher(SlurmLauncher):
"""
Launcher for SM training jobs using slurm.
This launcher will launch the job using `torchrun`, unlike the NeMo slurm launcher which use Pytorch lightning
to handle the torch.distributed. This launcher will create a separate train_script.sh with proper `torchrun` distributed arg prepared.
Checking `_make_train_script_text` function in stage.py for more details.
"""
def __init__(self, folder: Union[Path, str], job_name: str, **kwargs: Any) -> None:
# We need to handle this ntasks_per_node specifically
# Since we are using torchrun to launch custom jobs, we can not use ntasks_per_node in sbatch command
self.ntasks_per_node = kwargs.pop("ntasks_per_node", 8)
if "train_script_text" in kwargs:
self.train_script_text = kwargs.pop("train_script_text")
else:
raise ValueError(f"Missing train_script_text from launcher kwargs {kwargs}")
self.launch_docker_container_text = kwargs.pop("launch_docker_container_text", None)
self.docker_exec_script_text = kwargs.pop("docker_exec_script_text", None)
self.slurm_create_submission_file_only = kwargs.pop("slurm_create_submission_file_only", False)
if "hostfile" in kwargs:
self.hostfile = kwargs.pop("hostfile")
else:
raise ValueError(f"Missing hostfile from launcher kwargs {kwargs}")
if "slurm_docker_cfg" in kwargs:
kwargs.pop("slurm_docker_cfg")
super(SlurmLauncher, self).__init__(folder, job_name)
self.parameters = {}
self._update_parameters(job_name=job_name, **kwargs)
if shutil.which("srun") is None and not NEMO_LAUNCHER_DEBUG and not self.slurm_create_submission_file_only:
raise RuntimeError('Could not detect "srun", are you indeed on a slurm cluster?')
def _make_train_script_file(self):
"""
Create the custom train_script.sh
Optional create launch_docker_container.sh to launch docker container on every node
"""
job_paths = SMJobPaths(folder=self.folder, job_name=self.job_name)
folder = job_paths.folder
folder.mkdir(parents=True, exist_ok=True)
train_script_file_path = job_paths.train_script_file
with train_script_file_path.open("w") as f:
f.write(self.train_script_text)
if self.launch_docker_container_text is not None:
launch_docker_container_file = job_paths.launch_docker_container_file
with launch_docker_container_file.open("w") as f:
f.write(self.launch_docker_container_text)
if self.docker_exec_script_text is not None:
docker_exec_script_file = job_paths.docker_exec_script_file
with docker_exec_script_file.open("w") as f:
f.write(self.docker_exec_script_text)
def launch(self, command_groups: List[List[str]]) -> str:
# Create the custom train_script.sh before launching the real job
self._make_train_script_file()
# Same as upstream, but exposing extra control for submission through slurm_create_submission_file_only
submission_file_path = self._make_submission_file(command_groups)
logger.info(f"Job {self.job_name} submission file created at '{submission_file_path}'")
job_id = ""
if not NEMO_LAUNCHER_DEBUG and not self.slurm_create_submission_file_only:
job_id = self._submit_command(submission_file_path)
if job_id:
logger.info(f"Job {self.job_name} submitted with Job ID {job_id}")
with open(self.folder / "launcher.log", "w") as f:
f.write(f"Submitted batch job {job_id}")
else:
logger.info(f"To submit your job on Slurm, run `sbatch {submission_file_path}`")
return job_id
def _make_submission_file_text(self, command_groups: List[List[str]]) -> str:
"""
The submission file will be responsible for the following
- Handle sbatch config (implemented in upstream)
- Handle env variables (implemented in upstream)
- Handle storing distribution information which will be consumed by train_script.sh
- Call train_script.sh with proper srun command
"""
origin_sbatch_str = super()._make_submission_file_text(command_groups)
origin_sbatch_str = origin_sbatch_str.split("\n")
assert origin_sbatch_str[0] == "#!/bin/bash", origin_sbatch_str[0]
command_idx = None
for idx, sbatch_str in enumerate(origin_sbatch_str):
if sbatch_str.startswith("# command"):
command_idx = idx
break
assert command_idx is not None, f"Can not find command in the submission file str: {origin_sbatch_str}"
distributed_strs = [
"",
"# Prepare distributed files",
f'srun -l bash -c "scontrol show hostnames | sort > {self.hostfile}"',
"",
]
if self.launch_docker_container_text is None:
updated_sbatch_str = origin_sbatch_str[:command_idx] + distributed_strs + origin_sbatch_str[command_idx:]
else:
updated_sbatch_str = origin_sbatch_str[:command_idx] + distributed_strs + command_groups[0]
return "\n".join(updated_sbatch_str)