in dora/grid.py [0:0]
def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str,
rules: SubmitRules = SubmitRules(), slurm: tp.Optional[SlurmConfig] = None,
args: RunGridArgs = RunGridArgs()) -> tp.List[Sheep]:
"""
Run a grid search, this is the API underlying the `dora grid` command,
so that it can be used from a notebook.
You can also provide patterns to filter out XPs to be displayed.
Args:
main (DecoratedMain): main training function, decorated with Dora.
explorer (Explorer): explorer instance that will define the XPs to launch.
grid_name (str): this must be a unique name for the grid.
rules (SubmitRules): see `dora.conf.SubmitRules`, those defines the
rules for rescheduling failed XP etc.
slurm (SlurmConfig or None): if provided, this will override
the default Slurm config defined my the `main` argument.
Returns:
A list of `dora.shep.Sheep`.
"""
assert isinstance(explorer, Explorer)
if slurm is None:
slurm = main.get_slurm_config()
grid_folder = main.dora.dir / main.dora.grids / grid_name
grid_folder.mkdir(exist_ok=True, parents=True)
herd = Herd()
shepherd = Shepherd(main, log=log)
if main._slow:
with ProcessPoolExecutor(4) as pool:
launcher = Launcher(shepherd, slurm, herd, pool=pool)
explorer(launcher)
herd.complete()
else:
launcher = Launcher(shepherd, slurm, herd)
explorer(launcher)
shepherd.update()
sheeps = list(herd.sheeps.values())
sheeps = _filter_grid_sheeps(args.patterns, main, sheeps)
if args.clear:
if args.dry_run:
fatal("--dry_run is incompatible with --clear.")
log(f"You are about to restart {len(sheeps)} experiments from the grid {grid_name} "
"from scratch. This cannot be reverted.")
if args._from_commandline:
repl = input("Confirm [yN]: ")
if repl.lower() != "y":
fatal("Abort...")
log("Canceling all current jobs...")
for sheep in sheeps:
if sheep.job is not None:
shepherd.cancel_lazy(sheep.job)
shepherd.commit()
log("Deleting XP folders...")
for sheep in sheeps:
if sheep.xp.folder.exists():
reliable_rmtree(sheep.xp.folder)
sheep.job = None
to_unlink = []
old_sheeps = []
for child in grid_folder.iterdir():
if child.name not in herd.sheeps:
to_unlink.append(child)
try:
old_sheep = shepherd.get_sheep_from_sig(child.name)
except Exception as error:
log(f"Error when trying to load old sheep {child.name}: {error}")
# We fallback on manually loading the job file.
job_file = child / main.dora.shep.job_file
job = try_load(job_file)
if job is not None:
log(f"Canceling job {job.job_id} from unloadable sheep {child.name}.")
shepherd.cancel_lazy(job)
else:
assert old_sheep is not None
old_sheeps.append(old_sheep)
shepherd.update() # Update all job status
if not args.cancel:
sheep_map = {sheep.xp.sig: sheep for sheep in sheeps}
for job_array in herd.job_arrays:
array_sheeps = [sheep_map[sig] for sig in job_array if sig in sheep_map]
if not array_sheeps:
continue
first = array_sheeps[0]
slurm = herd.slurm_configs[first.xp.sig]
if len(array_sheeps) == 1:
shepherd.maybe_submit_lazy(first, slurm, rules)
else:
with shepherd.job_array(slurm):
for sheep in array_sheeps:
shepherd.maybe_submit_lazy(sheep, slurm, rules)
for old_sheep in old_sheeps:
if not old_sheep.is_done():
assert old_sheep.job is not None
shepherd.cancel_lazy(old_sheep.job)
name = main.get_name(old_sheep.xp)
log(f"Canceling job {old_sheep.job.job_id} for no longer required "
f"sheep {old_sheep.xp.sig}/{name}")
if args.cancel:
for sheep in sheeps:
if not sheep.is_done():
assert sheep.job is not None
name = main.get_name(sheep.xp)
log(f"Canceling job {sheep.job.job_id} for sheep {sheep.xp.sig}/{name}")
shepherd.cancel_lazy(sheep.job)
if not args.dry_run:
for sheep in sheeps:
link = (grid_folder / sheep.xp.sig)
if link.exists() or link.is_symlink():
assert link.is_symlink() and link.resolve() == sheep.xp.folder
else:
link.symlink_to(sheep.xp.folder)
shepherd.commit()
for child in to_unlink:
child.unlink()
if args.init:
for sheep in sheeps:
main.init_xp(sheep.xp)
if args.cancel:
return sheeps
if not sheeps:
log("No sheep to handle.")
return sheeps
actions = [action for action in [args.folder, args.log, args.tail] if action is not None]
if actions:
if not args._from_commandline:
raise RuntimeError("The folder, log, and tail "
"flags are only supported from the command line.")
assert len(actions) == 1
index = actions[0]
try:
sheep = sheeps[index]
except IndexError:
fatal(f"Invalid index {args.folder}")
name = main.get_name(sheep.xp)
if args.folder is not None:
print(sheep.xp.folder)
elif args.tail is not None:
if not sheep.log.exists():
fatal(f"Log {sheep.log} does not exist")
os.execvp("tail", ["tail", "-n", "200", "-f", sheep.log])
else:
if not sheep.log.exists():
fatal(f"Log file does not exist for sheep {name}.")
try:
shutil.copyfileobj(open(sheep.log), sys.stdout)
except BrokenPipeError:
pass
return sheeps
maybe_print: tp.Callable
if args.silent:
maybe_print = no_print
else:
maybe_print = print
maybe_print(f"Monitoring Grid {grid_name}")
while True:
if args.jupyter and not args.silent:
from IPython import display
display.clear_output(wait=True)
shepherd.update()
if monitor(args, main, explorer, sheeps, maybe_print):
# All jobs finished or failed, stop monitoring
break
if not args.monitor:
break
sleep = int(args.interval * 60)
maybe_print()
for ela in range(sleep):
out = f'Next update in {sleep - ela:.0f} seconds '
if sleep - ela < 10:
out = colorize(out, '31')
maybe_print(out, end='\r')
time.sleep(1)
maybe_print(' ' * 60)
return sheeps