def launch_job()

in src/datatrove/executor/slurm.py [0:0]


    def launch_job(self):
        """
            Takes care of creating a sbatch script for this pipeline and launching it.
        Returns:

        """
        assert not self.depends or (isinstance(self.depends, SlurmPipelineExecutor)), (
            "depends= must be a SlurmPipelineExecutor"
        )
        if self.depends:
            # take care of launching any unlaunched dependencies and getting their slurm job ids
            if not self.depends.job_id:
                logger.info(f'Launching dependency job "{self.depends.job_name}"')
                self.depends.launch_job()
            if self.depends.job_id != -1:
                self.depends_job_id = self.depends.job_id
            self.depends = None  # avoid pickling the entire dependency and possibly its dependencies

        ranks_to_run = self.get_incomplete_ranks()
        if len(ranks_to_run) == 0:
            logger.info(f"Skipping launch of {self.job_name} as all {self.tasks} tasks have already been completed.")
            self.job_id = -1
            return

        executor = deepcopy(self)

        # pickle. The slurm job will load the executor from this pik file
        with self.logging_dir.open("executor.pik", "wb") as executor_f:
            dill.dump(executor, executor_f, fmode=CONTENTS_FMODE)
        self.save_executor_as_json()

        with self.logging_dir.open("ranks_to_run.json", "w") as ranks_to_run_file:
            # we actually save this (only once) to avoid race conditions
            json.dump(ranks_to_run, ranks_to_run_file)

        nb_jobs_to_launch = math.ceil(len(ranks_to_run) / self.tasks_per_job)
        max_array = min(nb_jobs_to_launch, self.max_array_size) if self.max_array_size != -1 else nb_jobs_to_launch

        # create the actual sbatch script
        srun_args_str = " ".join([f"--{k}={v}" for k, v in self.srun_args.items()]) if self.srun_args else ""
        launch_file_contents = self.get_launch_file_contents(
            self.get_sbatch_args(max_array),
            # use "-n 1" for each srun command to enforce that only one task will be launched.
            # Some setting may lead to two tasks, see https://groups.google.com/g/slurm-users/c/L4nCXtZLlTo
            f"srun {srun_args_str} -l -n 1 launch_pickled_pipeline {self.logging_dir.resolve_paths('executor.pik')}",
        )
        # save it
        with self.logging_dir.open("launch_script.slurm", "w") as launchscript_f:
            launchscript_f.write(launch_file_contents)
        logger.info(
            f"Launching Slurm job {self.job_name} ({len(ranks_to_run)} tasks) with launch script "
            f'"{self.logging_dir.resolve_paths("launch_script.slurm")}"'
        )

        # launch (possibly multiple) jobs
        launched_jobs = 0
        while launched_jobs * max_array < nb_jobs_to_launch:
            if launched_jobs and self.max_array_launch_parallel and self.stagger_max_array_jobs > 0:
                time.sleep(self.stagger_max_array_jobs)
            args = [f"--export=ALL,RUN_OFFSET={launched_jobs}"]
            if self.dependency:
                args.append(f"--dependency={self.dependency}")
            self.job_id = launch_slurm_job(launch_file_contents, self.job_id_retriever, *args)
            launched_jobs += 1
        logger.info(f"Slurm job launched successfully with (last) id={self.job_id}.")
        self.launch_merge_stats()