in ax/service/scheduler.py [0:0]
def poll_and_process_results(self, poll_all_trial_statuses: bool = False) -> bool:
"""Takes the following actions:
1. Poll trial runs for their statuses
2. If any experiment metrics are available while running,
fetch data for running trials
3. Determine which trials should be early stopped
4. Early-stop those trials
5. Update the experiment with the new trial statuses
6. Fetch the data for newly completed trials
Returns:
A boolean representing whether any trial evaluations completed
of have been marked as failed or abandoned, changing the number of
currently running trials.
"""
self._sleep_if_too_early_to_poll()
updated_any_trial = False # Whether any trial updates were performed.
prev_completed_trial_idcs = set(
self.experiment.trial_indices_by_status[TrialStatus.COMPLETED]
)
# 1. Poll trial statuses
new_status_to_trial_idcs = self.poll_trial_status(
poll_all_trial_statuses=poll_all_trial_statuses
)
# Note: We could use `new_status_to_trial_idcs[TrialStatus.Running]`
# for the running_trial_indices, but we don't enforce
# that users return the status of trials that are not being updated.
# Thus, if a trial was running in the last poll and is still running
# in this poll, it might not appear in new_status_to_trial_idcs.
# Instead, to get the list of all currently running trials at this
# point in time, we look at self.running_trials, which contains trials
# that were running in the last poll, and we exclude trials that were
# newly terminated in this poll.
terminated_trial_idcs = {
index
for status, indices in new_status_to_trial_idcs.items()
if status.is_terminal
for index in indices
}
running_trial_indices = {
trial.index
for trial in self.running_trials
if trial.index not in terminated_trial_idcs
}
# 2. If any experiment metrics are available while running,
# fetch data for running trials
if any(
m.is_available_while_running() for m in self.experiment.metrics.values()
):
# NOTE: Metrics that are *not* available_while_running will be skipped
# in fetch_trials_data
idcs = make_indices_str(indices=running_trial_indices)
self.logger.info(
f"Fetching data for trials: {idcs} because some metrics "
"on experiment are available while trials are running."
)
self.experiment.fetch_trials_data(
trial_indices=running_trial_indices,
overwrite_existing_data=True,
)
# 3. Determine which trials to stop early
stop_trial_info = self.should_stop_trials_early(
trial_indices=running_trial_indices
)
# 4. Stop trials early
self.stop_trial_runs(
trials=[self.experiment.trials[trial_idx] for trial_idx in stop_trial_info],
reasons=list(stop_trial_info.values()),
)
# 5. Update trial statuses on the experiment
new_status_to_trial_idcs = self._update_status_dict(
status_dict=new_status_to_trial_idcs,
updating_status_dict={TrialStatus.EARLY_STOPPED: set(stop_trial_info)},
)
updated_trials = []
for status, trial_idcs in new_status_to_trial_idcs.items():
if status.is_candidate or status.is_deployed:
# No need to consider candidate, staged or running trials here (none of
# these trials should actually be candidates, but we can filter on that)
continue
if len(trial_idcs) > 0:
idcs = make_indices_str(indices=trial_idcs)
self.logger.info(f"Retrieved {status.name} trials: {idcs}.")
updated_any_trial = True
# Update trial statuses and record which trials were updated.
trials = self.experiment.get_trials_by_indices(trial_idcs)
for trial in trials:
trial.mark_as(status=status, unsafe=True)
# 6. Fetch data for newly completed trials
if status.is_completed:
newly_completed = trial_idcs - prev_completed_trial_idcs
# Fetch the data for newly completed trials; this will cache the data
# for all metrics. By pre-caching the data now, we remove the need to
# fetch it during candidate generation.
idcs = make_indices_str(indices=newly_completed)
self.logger.info(f"Fetching data for trials: {idcs}.")
self.experiment.fetch_trials_data(trial_indices=newly_completed)
updated_trials.extend(trials)
if not updated_any_trial: # Did not update anything, nothing to save.
return False
self.logger.debug(f"Updating {len(updated_trials)} trials in DB.")
self._save_or_update_trials_in_db_if_possible(
experiment=self.experiment,
trials=updated_trials,
)
return updated_any_trial