in syne_tune/tuner.py [0:0]
def run(self):
"""
Launches the tuning.
:return: the tuning status when finished
"""
try:
logger.info(f"results of trials will be saved on {self.tuner_path}")
if self.tuning_status is None:
self.tuning_status = TuningStatus(metric_names=self.scheduler.metric_names())
# prints the status every print_update_interval seconds
self.status_printer = RegularCallback(
call_seconds_frequency=self.print_update_interval,
callback=lambda tuning_status: logger.info("tuning status (last metric is reported)\n" + str(tuning_status)),
)
# saves the tuner every results_update_interval seconds
self.tuner_saver = RegularCallback(
callback=lambda tuner: tuner.save(),
call_seconds_frequency=self.results_update_interval
)
self.metadata[ST_TUNER_START_TIMESTAMP] = time.time()
for callback in self.callbacks:
callback.on_tuning_start(self)
self.tuner_path.mkdir(exist_ok=True, parents=True)
self._save_metadata()
done_trials_statuses = OrderedDict()
# `running_trial_ids` contains the ids of all trials currently running,
# whether they were started from scratch or were resumed from a pausing
# state
running_trials_ids = set()
search_space_exhausted = False
while not self._stop_condition():
for callback in self.callbacks:
callback.on_loop_start()
new_done_trial_statuses, new_results = self._process_new_results(
running_trials_ids=running_trials_ids,
)
if new_results:
# Save tuner state only if there have been new results
self.tuner_saver(tuner=self)
# update the list of done trials and remove those from `running_trials_ids`
# Note: It is important to update `running_trials_ids` before
# calling `_schedule_new_tasks`.
# Otherwise, a trial can be registered as paused in
# `_process_new_results`, and immediately be resumed in
# `_schedule_new_tasks`. If `new_done_trial_statuses` is subtracted from
# `running_trials_ids` afterwards only, this trial is removed from
# `running_trials_ids` even though it is running. Also, its status remains
# paused, because the next call of `_process_new_results` only considers
# trials in `running_trials_ids`.
done_trials_statuses.update(new_done_trial_statuses)
running_trials_ids.difference_update(new_done_trial_statuses.keys())
if search_space_exhausted:
# if the search space is exhausted, we loop until the running trials are done or until the
# stop condition is reached
if len(running_trials_ids) > 0:
logger.debug(f"Search space exhausted, waiting for completion of running trials "
f"{running_trials_ids}")
self._sleep()
else:
break
else:
try:
self._schedule_new_tasks(running_trials_ids=running_trials_ids)
except StopIteration:
logger.info("Tuning is finishing as the whole search space got exhausted.")
search_space_exhausted = True
print("Tuning is finishing as the whole search space got exhausted.")
self.status_printer(self.tuning_status)
for callback in self.callbacks:
callback.on_loop_end()
finally:
# graceful termination block called when the tuner reached its stop condition, when an error happened or
# when the job got interrupted (can happen in spot-instances or when sending a SIGINT signal with ctrl+C).
# the block displays the best configuration found and stops trials that may still be running.
print_best_metric_found(
tuning_status=self.tuning_status,
metric_names=self.scheduler.metric_names(),
mode=self.scheduler.metric_mode(),
)
logger.info("Tuner finished, stopping trials that may still be running.")
self.backend.stop_all()
# notify tuning status that jobs were stopped without having to query their status in the backend since
# we know that all trials were stopped
self.tuning_status.mark_running_job_as_stopped()
self.save()
for callback in self.callbacks:
callback.on_tuning_end()
# in case too many errors were triggered, show log of last failed job and terminates with an error
if self.tuning_status.num_trials_failed > self.max_failures:
self._handle_failure(done_trials_statuses=done_trials_statuses)