def _process_events_until_now()

in syne_tune/backend/simulator_backend/simulator_backend.py [0:0]


    def _process_events_until_now(self):
        """
        We process all events in the queue with times before
        `time_keeper.time()`.
        """
        time_now = self._time_keeper.time()
        next_event = self._simulator_state.next_until(time_now)
        while next_event is not None:
            time_event, event = next_event
            trial_id = event.trial_id
            if isinstance(event, StartEvent):
                self._debug_message('StartEvent', time=time_event, trial_id=trial_id)
                # Run training script and push event for each result
                self._process_start_event(
                    trial_id=trial_id, time_event=time_event)
            elif isinstance(event, CompleteEvent):
                trial_result = self._trial_dict[trial_id]
                status = event.status
                self._debug_message('CompleteEvent', time=time_event,
                                    trial_id=trial_id, status=status)
                training_end_time = self._time_keeper.start_time_stamp + \
                                    timedelta(seconds=time_event)
                if isinstance(trial_result, TrialResult):
                    trial_result.status = status
                    trial_result.training_end_time = training_end_time
                else:
                    # No results reported for the trial. This can happen if
                    # the trial failed
                    self._trial_dict[trial_id] = trial_result.add_results(
                        metrics=[], status=status,
                        training_end_time=training_end_time)
            elif isinstance(event, StopEvent):
                self._debug_message('StopEvent', time=time_event,
                                    trial_id=trial_id)
                # Remove all remaining events for `trial_id`. This includes
                # the `CompleteEvent` pushed with `StartEvent`, so there can
                # be no confusion with the 2nd `CompleteEvent` pushed by
                # `_stop_trial`.
                self._simulator_state.remove_events(trial_id)
            elif isinstance(event, OnTrialResultEvent):
                result = copy.copy(event.result)
                epoch = result.get('epoch')  # DEBUG
                self._debug_message('OnTrialResultEvent', time=time_event,
                                    trial_id=trial_id, epoch=epoch)
                # Append timestamps to `result`. This is done here, but not in
                # the other back-ends, for which timestamps are only added when
                # results are written out.
                result[ST_TUNER_TIME] = time_event
                if trial_id in self._next_results_to_fetch:
                    self._next_results_to_fetch[trial_id].append(result)
                else:
                    self._next_results_to_fetch[trial_id] = [result]
                trial_result = self._trial_dict[trial_id]
                if isinstance(trial_result, TrialResult):
                    trial_result.metrics.append(result)
                else:
                    self._trial_dict[trial_id] = trial_result.add_results(
                        metrics=[result], status=Status.in_progress,
                        training_end_time=None)
                # Counts the total number of results obtained for a trial_id,
                # even if resumed multiple times
                self._last_metric_seen_index[trial_id] += 1
            else:
                raise TypeError(f"Event at time {time_event} of unknown type: {event}")
            next_event = self._simulator_state.next_until(time_now)