def _analyze_stage_log()

in log_analyzer/log.py [0:0]


def _analyze_stage_log(comm_log: List[Dict], stage: str, comm_info: Dict[str, Dict]):
    def __update_info(
        info_dict,
        log,
        primary_key: List[str],
        agg_key: List[str],
        performance_key: List[str],
        busbw_key: List[str],
    ):
        primary_key = tuple(log[key] for key in primary_key)
        if primary_key not in info_dict:
            info_dict[primary_key] = dict((key, 0) for key in agg_key)
            info_dict[primary_key].update(dict((key, []) for key in performance_key))
            info_dict[primary_key].update(dict((key, []) for key in busbw_key))
        for key in agg_key:
            info_dict[primary_key][key] += log[key]
        for key in performance_key:
            info_dict[primary_key][key].append(log[key])
        for key in busbw_key:
            info_dict[primary_key][key].append(log[key])

    if stage not in comm_info:
        comm_info[stage] = {
            "count": 0,
            "comm_type_info": {},
            "detailed_comm_type_info": {},
        }
    comm_info[stage]["count"] += 1
    # key: comm_type, value: count, time_ms
    comm_type_info = comm_info[stage]["comm_type_info"]
    # key: comm_type, msg_size, value: count, time_ms
    detailed_comm_type_info = comm_info[stage]["detailed_comm_type_info"]
    for log in comm_log:
        if log.comm_type != CommType.computation:
            __update_info(
                comm_type_info,
                log.__dict__,
                ["comm_type", "comm_group"],
                ["count", "msg_size"],
                ["_elapsed_time"],
                ["busbw"],
            )
            __update_info(
                detailed_comm_type_info,
                log.__dict__,
                ["comm_type", "comm_group", "msg_size"],
                ["count"],
                ["_elapsed_time"],
                ["busbw"],
            )