in src/datatrove/executor/slurm.py [0:0]
def launch_job(self):
"""
Takes care of creating a sbatch script for this pipeline and launching it.
Returns:
"""
assert not self.depends or (isinstance(self.depends, SlurmPipelineExecutor)), (
"depends= must be a SlurmPipelineExecutor"
)
if self.depends:
# take care of launching any unlaunched dependencies and getting their slurm job ids
if not self.depends.job_id:
logger.info(f'Launching dependency job "{self.depends.job_name}"')
self.depends.launch_job()
if self.depends.job_id != -1:
self.depends_job_id = self.depends.job_id
self.depends = None # avoid pickling the entire dependency and possibly its dependencies
ranks_to_run = self.get_incomplete_ranks()
if len(ranks_to_run) == 0:
logger.info(f"Skipping launch of {self.job_name} as all {self.tasks} tasks have already been completed.")
self.job_id = -1
return
executor = deepcopy(self)
# pickle. The slurm job will load the executor from this pik file
with self.logging_dir.open("executor.pik", "wb") as executor_f:
dill.dump(executor, executor_f, fmode=CONTENTS_FMODE)
self.save_executor_as_json()
with self.logging_dir.open("ranks_to_run.json", "w") as ranks_to_run_file:
# we actually save this (only once) to avoid race conditions
json.dump(ranks_to_run, ranks_to_run_file)
nb_jobs_to_launch = math.ceil(len(ranks_to_run) / self.tasks_per_job)
max_array = min(nb_jobs_to_launch, self.max_array_size) if self.max_array_size != -1 else nb_jobs_to_launch
# create the actual sbatch script
srun_args_str = " ".join([f"--{k}={v}" for k, v in self.srun_args.items()]) if self.srun_args else ""
launch_file_contents = self.get_launch_file_contents(
self.get_sbatch_args(max_array),
# use "-n 1" for each srun command to enforce that only one task will be launched.
# Some setting may lead to two tasks, see https://groups.google.com/g/slurm-users/c/L4nCXtZLlTo
f"srun {srun_args_str} -l -n 1 launch_pickled_pipeline {self.logging_dir.resolve_paths('executor.pik')}",
)
# save it
with self.logging_dir.open("launch_script.slurm", "w") as launchscript_f:
launchscript_f.write(launch_file_contents)
logger.info(
f"Launching Slurm job {self.job_name} ({len(ranks_to_run)} tasks) with launch script "
f'"{self.logging_dir.resolve_paths("launch_script.slurm")}"'
)
# launch (possibly multiple) jobs
launched_jobs = 0
while launched_jobs * max_array < nb_jobs_to_launch:
if launched_jobs and self.max_array_launch_parallel and self.stagger_max_array_jobs > 0:
time.sleep(self.stagger_max_array_jobs)
args = [f"--export=ALL,RUN_OFFSET={launched_jobs}"]
if self.dependency:
args.append(f"--dependency={self.dependency}")
self.job_id = launch_slurm_job(launch_file_contents, self.job_id_retriever, *args)
launched_jobs += 1
logger.info(f"Slurm job launched successfully with (last) id={self.job_id}.")
self.launch_merge_stats()