def poll_and_process_results()

in ax/service/scheduler.py [0:0]


    def poll_and_process_results(self, poll_all_trial_statuses: bool = False) -> bool:
        """Takes the following actions:
            1. Poll trial runs for their statuses
            2. If any experiment metrics are available while running,
               fetch data for running trials
            3. Determine which trials should be early stopped
            4. Early-stop those trials
            5. Update the experiment with the new trial statuses
            6. Fetch the data for newly completed trials

        Returns:
            A boolean representing whether any trial evaluations completed
            of have been marked as failed or abandoned, changing the number of
            currently running trials.
        """
        self._sleep_if_too_early_to_poll()

        updated_any_trial = False  # Whether any trial updates were performed.
        prev_completed_trial_idcs = set(
            self.experiment.trial_indices_by_status[TrialStatus.COMPLETED]
        )

        # 1. Poll trial statuses
        new_status_to_trial_idcs = self.poll_trial_status(
            poll_all_trial_statuses=poll_all_trial_statuses
        )

        # Note: We could use `new_status_to_trial_idcs[TrialStatus.Running]`
        # for the running_trial_indices, but we don't enforce
        # that users return the status of trials that are not being updated.
        # Thus, if a trial was running in the last poll and is still running
        # in this poll, it might not appear in new_status_to_trial_idcs.
        # Instead, to get the list of all currently running trials at this
        # point in time, we look at self.running_trials, which contains trials
        # that were running in the last poll, and we exclude trials that were
        # newly terminated in this poll.
        terminated_trial_idcs = {
            index
            for status, indices in new_status_to_trial_idcs.items()
            if status.is_terminal
            for index in indices
        }
        running_trial_indices = {
            trial.index
            for trial in self.running_trials
            if trial.index not in terminated_trial_idcs
        }

        # 2. If any experiment metrics are available while running,
        #    fetch data for running trials
        if any(
            m.is_available_while_running() for m in self.experiment.metrics.values()
        ):
            # NOTE: Metrics that are *not* available_while_running will be skipped
            # in fetch_trials_data
            idcs = make_indices_str(indices=running_trial_indices)
            self.logger.info(
                f"Fetching data for trials: {idcs} because some metrics "
                "on experiment are available while trials are running."
            )
            self.experiment.fetch_trials_data(
                trial_indices=running_trial_indices,
                overwrite_existing_data=True,
            )

        # 3. Determine which trials to stop early
        stop_trial_info = self.should_stop_trials_early(
            trial_indices=running_trial_indices
        )

        # 4. Stop trials early
        self.stop_trial_runs(
            trials=[self.experiment.trials[trial_idx] for trial_idx in stop_trial_info],
            reasons=list(stop_trial_info.values()),
        )

        # 5. Update trial statuses on the experiment
        new_status_to_trial_idcs = self._update_status_dict(
            status_dict=new_status_to_trial_idcs,
            updating_status_dict={TrialStatus.EARLY_STOPPED: set(stop_trial_info)},
        )
        updated_trials = []
        for status, trial_idcs in new_status_to_trial_idcs.items():
            if status.is_candidate or status.is_deployed:
                # No need to consider candidate, staged or running trials here (none of
                # these trials should actually be candidates, but we can filter on that)
                continue

            if len(trial_idcs) > 0:
                idcs = make_indices_str(indices=trial_idcs)
                self.logger.info(f"Retrieved {status.name} trials: {idcs}.")
                updated_any_trial = True

            # Update trial statuses and record which trials were updated.
            trials = self.experiment.get_trials_by_indices(trial_idcs)
            for trial in trials:
                trial.mark_as(status=status, unsafe=True)

            # 6. Fetch data for newly completed trials
            if status.is_completed:
                newly_completed = trial_idcs - prev_completed_trial_idcs
                # Fetch the data for newly completed trials; this will cache the data
                # for all metrics. By pre-caching the data now, we remove the need to
                # fetch it during candidate generation.
                idcs = make_indices_str(indices=newly_completed)
                self.logger.info(f"Fetching data for trials: {idcs}.")
                self.experiment.fetch_trials_data(trial_indices=newly_completed)

            updated_trials.extend(trials)

        if not updated_any_trial:  # Did not update anything, nothing to save.
            return False

        self.logger.debug(f"Updating {len(updated_trials)} trials in DB.")
        self._save_or_update_trials_in_db_if_possible(
            experiment=self.experiment,
            trials=updated_trials,
        )
        return updated_any_trial