in ax/service/scheduler.py [0:0]
def wait_for_completed_trials_and_report_results(self) -> Dict[str, Any]:
"""Continuously poll for successful trials, with limited exponential
backoff, and process the results. Stop once at least one successful
trial has been found. This function can be overridden to a different
waiting function as needed; it must call `poll_and_process_results`
to ensure that trials that completed their evaluation are appropriately
marked as 'COMPLETED' in Ax.
Returns: Results of the optimization so far, represented as a
dict. The contents of the dict depend on the implementation of
`report_results` in the given `Scheduler` subclass.
"""
if (
self.options.init_seconds_between_polls is None
and self.options.early_stopping_strategy is None
):
raise ValueError( # pragma: no cover
"Default `wait_for_completed_trials_and_report_results` in base "
"`Scheduler` relies on non-null `init_seconds_between_polls` scheduler "
"option or for an EarlyStoppingStrategy to be specified."
)
elif (
self.options.init_seconds_between_polls is not None
and self.options.early_stopping_strategy is not None
):
self.logger.warn(
"Both `init_seconds_between_polls` and `early_stopping_strategy "
"supplied. `init_seconds_between_polls="
f"{self.options.init_seconds_between_polls}` will be overrridden by "
"`early_stopping_strategy.seconds_between_polls="
f"{self.options.early_stopping_strategy.seconds_between_polls}` and "
"polling will take place at a constant rate."
)
seconds_between_polls = self.options.init_seconds_between_polls
backoff_factor = self.options.seconds_between_polls_backoff_factor
if self.options.early_stopping_strategy is not None:
seconds_between_polls = (
self.options.early_stopping_strategy.seconds_between_polls
)
# Do not backoff with early stopping, a constant heartbeat is preferred
backoff_factor = 1
total_seconds_elapsed = 0
while len(self.pending_trials) > 0 and not self.poll_and_process_results():
if total_seconds_elapsed > MAX_SECONDS_BETWEEN_REPORTS:
break # If maximum wait time reached, check the stopping
# criterion again and and re-attempt scheduling more trials.
log_seconds = (
int(seconds_between_polls)
if seconds_between_polls > 2
else seconds_between_polls
)
self.logger.info(
f"Waiting for completed trials (for {log_seconds} sec, "
f"currently running trials: {len(self.running_trials)})."
)
sleep(seconds_between_polls)
total_seconds_elapsed += seconds_between_polls
seconds_between_polls *= backoff_factor
return self.report_results()