in chatlearn/runtime/engine.py [0:0]
def logging_summary(self, iteration=-1):
"""
:meta private:
"""
## 1. model e2e time
e2e_time_dict = super().logging_summary(iteration)
# flatten time to name/<e2e or forward_step or eval_step and so on>
model_time_dict = {}
for model in self.remote_models:
model_e2e_time_dict = e2e_time_dict.get(model.name, {})
for key, value in model_e2e_time_dict.items():
model_time_dict[f"{model.name}/{key}"] = value
## 2. episode time
timer_names = ['sync_parameters',]
# timer_names before episode looping
if iteration == -1 and self.evaluator and self.runtime_args.enable_eval_before_training:
timer_names.append('evaluate')
# timer_names in episode looping
elif iteration >= 0:
timer_names.extend(['episode','train',])
if self.runtime_args.save_episode_interval and \
(iteration + 1) % self.runtime_args.save_episode_interval == 0:
timer_names.append('save_checkpoint')
if self.evaluator is not None and \
self.runtime_args.eval_episode_interval and \
(iteration + 1) % self.runtime_args.eval_episode_interval == 0:
timer_names.append('evaluate')
episode_str, episode_metrics = self.timers.log(names=timer_names, return_dict=True)
log_str = f"{LOG_START} {self._name} episode summary, episode {iteration + 1} {episode_str}"
logger.info(log_str)
## 3. log model e2e time and episode time
episode_metrics.update(model_time_dict)
self.metric_manager.log("engine/timer_summary", iteration + 1, episode_metrics)
## 4. log before episode looping
if iteration == -1:
if self.evaluator and self.runtime_args.enable_eval_before_training:
prefix, evaluate_metrics = self.evaluator.get_and_clear_metrics()
self.metric_manager.log(prefix, iteration + 1, evaluate_metrics)
return
## 5. log in episode looping
# Train metrics
for model in self.remote_models:
# all_metric_tuples is like
# [rank n-1, rank 2n-1, ...]
# each rank refers to a tuple like (prefix, metric)
# example1 [[('vllm_inference', {'prompt_token_length': 108.5})], [('vllm_inference', {'prompt_token_length': 121.75})]]
# example2 [('', {})]
# example3 [('', {'train_reward_score': 0.78125}), ('', {'train_reward_score': 0.625})]
all_metric_tuples = future.get(model.get_and_clear_metrics())
if isinstance(all_metric_tuples[0], list):
all_metric_tuples_flaten = []
for item in all_metric_tuples:
all_metric_tuples_flaten += item
all_metric_tuples = all_metric_tuples_flaten
prefix = all_metric_tuples[0][0]
last_rank_metrics = [metric_tuple[1] for metric_tuple in all_metric_tuples]
model_metrics = map_reduce_metrics(last_rank_metrics)
self.metric_manager.log(prefix, iteration + 1, model_metrics)
# Reward metrics
if self._data_loader:
prefix, train_reward_metrics = future.get(self._data_loader.get_and_clear_metrics.remote())
self.metric_manager.log(prefix, iteration + 1, train_reward_metrics)
# Evaluate metrics
if self.evaluator:
prefix, evaluate_metrics = self.evaluator.get_and_clear_metrics()
self.metric_manager.log(prefix, iteration + 1, evaluate_metrics)