in syne_tune/backend/simulator_backend/simulator_backend.py [0:0]
def _process_events_until_now(self):
"""
We process all events in the queue with times before
`time_keeper.time()`.
"""
time_now = self._time_keeper.time()
next_event = self._simulator_state.next_until(time_now)
while next_event is not None:
time_event, event = next_event
trial_id = event.trial_id
if isinstance(event, StartEvent):
self._debug_message('StartEvent', time=time_event, trial_id=trial_id)
# Run training script and push event for each result
self._process_start_event(
trial_id=trial_id, time_event=time_event)
elif isinstance(event, CompleteEvent):
trial_result = self._trial_dict[trial_id]
status = event.status
self._debug_message('CompleteEvent', time=time_event,
trial_id=trial_id, status=status)
training_end_time = self._time_keeper.start_time_stamp + \
timedelta(seconds=time_event)
if isinstance(trial_result, TrialResult):
trial_result.status = status
trial_result.training_end_time = training_end_time
else:
# No results reported for the trial. This can happen if
# the trial failed
self._trial_dict[trial_id] = trial_result.add_results(
metrics=[], status=status,
training_end_time=training_end_time)
elif isinstance(event, StopEvent):
self._debug_message('StopEvent', time=time_event,
trial_id=trial_id)
# Remove all remaining events for `trial_id`. This includes
# the `CompleteEvent` pushed with `StartEvent`, so there can
# be no confusion with the 2nd `CompleteEvent` pushed by
# `_stop_trial`.
self._simulator_state.remove_events(trial_id)
elif isinstance(event, OnTrialResultEvent):
result = copy.copy(event.result)
epoch = result.get('epoch') # DEBUG
self._debug_message('OnTrialResultEvent', time=time_event,
trial_id=trial_id, epoch=epoch)
# Append timestamps to `result`. This is done here, but not in
# the other back-ends, for which timestamps are only added when
# results are written out.
result[ST_TUNER_TIME] = time_event
if trial_id in self._next_results_to_fetch:
self._next_results_to_fetch[trial_id].append(result)
else:
self._next_results_to_fetch[trial_id] = [result]
trial_result = self._trial_dict[trial_id]
if isinstance(trial_result, TrialResult):
trial_result.metrics.append(result)
else:
self._trial_dict[trial_id] = trial_result.add_results(
metrics=[result], status=Status.in_progress,
training_end_time=None)
# Counts the total number of results obtained for a trial_id,
# even if resumed multiple times
self._last_metric_seen_index[trial_id] += 1
else:
raise TypeError(f"Event at time {time_event} of unknown type: {event}")
next_event = self._simulator_state.next_until(time_now)