in log_analyzer/log.py [0:0]
def _analyze_stage_log(comm_log: List[Dict], stage: str, comm_info: Dict[str, Dict]):
def __update_info(
info_dict,
log,
primary_key: List[str],
agg_key: List[str],
performance_key: List[str],
busbw_key: List[str],
):
primary_key = tuple(log[key] for key in primary_key)
if primary_key not in info_dict:
info_dict[primary_key] = dict((key, 0) for key in agg_key)
info_dict[primary_key].update(dict((key, []) for key in performance_key))
info_dict[primary_key].update(dict((key, []) for key in busbw_key))
for key in agg_key:
info_dict[primary_key][key] += log[key]
for key in performance_key:
info_dict[primary_key][key].append(log[key])
for key in busbw_key:
info_dict[primary_key][key].append(log[key])
if stage not in comm_info:
comm_info[stage] = {
"count": 0,
"comm_type_info": {},
"detailed_comm_type_info": {},
}
comm_info[stage]["count"] += 1
# key: comm_type, value: count, time_ms
comm_type_info = comm_info[stage]["comm_type_info"]
# key: comm_type, msg_size, value: count, time_ms
detailed_comm_type_info = comm_info[stage]["detailed_comm_type_info"]
for log in comm_log:
if log.comm_type != CommType.computation:
__update_info(
comm_type_info,
log.__dict__,
["comm_type", "comm_group"],
["count", "msg_size"],
["_elapsed_time"],
["busbw"],
)
__update_info(
detailed_comm_type_info,
log.__dict__,
["comm_type", "comm_group", "msg_size"],
["count"],
["_elapsed_time"],
["busbw"],
)