in dora/shep.py [0:0]
def _submit(self, job_array: _JobArray):
sheeps = job_array.sheeps
slurm_config = job_array.slurm_config
if not sheeps:
return
is_array = len(sheeps) > 1
first = sheeps[0]
self.main.init_xp(first.xp)
use_git_save = first.xp.dora.git_save
assert all(other.xp.dora.git_save == use_git_save for other in sheeps), \
"All jobs inside an array must have the same value for git_save."""
if is_array:
name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps]))
else:
name_sig = first.xp.sig
if is_array:
name = self.main.name + "_array_" + name_sig
else:
name = self.main.name + "_" + name_sig
if is_array:
submitit_folder = self._arrays / name
else:
submitit_folder = first.xp._xp_submitit
submitit_folder.mkdir(exist_ok=True)
for sheep in sheeps:
xp = sheep.xp
self.main.init_xp(xp)
if xp.rendezvous_file.exists():
xp.rendezvous_file.unlink()
executor = self._get_submitit_executor(name, submitit_folder, slurm_config)
jobs: tp.List[submitit.Job] = []
if use_git_save and self._existing_git_clone is None:
self._existing_git_clone = git_save.get_new_clone(self.main.dora)
with self._enter_orphan(name):
with ExitStack() as stack:
if use_git_save:
assert self._existing_git_clone is not None
stack.enter_context(git_save.enter_clone(self._existing_git_clone))
if is_array:
stack.enter_context(executor.batch())
for sheep in job_array.sheeps:
if use_git_save:
assert self._existing_git_clone is not None
git_save.assign_clone(sheep.xp, self._existing_git_clone)
jobs.append(executor.submit(_SubmitItTarget(), self.main, sheep.xp.argv))
# Now we can access jobs
for sheep, job in zip(sheeps, jobs):
# See commment in `Sheep.state` function above for storing all jobs in the array.
pickle.dump((job, jobs), open(sheep._job_file, "wb"))
logger.debug("Created job with id %s", job.job_id)
sheep.job = job # type: ignore
sheep._other_jobs = jobs # type: ignore
link = self._by_id / job.job_id
link = link
link.symlink_to(sheep.xp.folder.resolve())
if is_array:
# We link the array submitit folder to be sure
# we keep an history of all arrays the XP was in.
submitit_link = (sheep.xp.folder / submitit_folder.name)
if submitit_link.exists():
assert submitit_link.resolve() == submitit_folder.resolve()
else:
submitit_link.symlink_to(submitit_folder)
latest = sheep.xp._latest_submitit
if latest.exists():
latest.unlink()
latest.symlink_to(submitit_folder)
name = self.main.get_name(sheep.xp)
self.log(f"Scheduled job {job.job_id} for sheep {sheep.xp.sig}/{name}")