def parse_ds_comm_log()

in log_analyzer/ds_comm_log_analyzer.py [0:0]


def parse_ds_comm_log(filename):
    comm_log = Log()
    with open(filename, "r") as f:
        lines = f.read().split("\n")
        for line in lines:
            if "After initializing ZeRO optimizer" in line:
                comm_log.add_comm_log(LogItem(comm_type=CommType.epoch_end))
                continue
            elif "microstep" in line:
                comm_log.add_comm_log(LogItem(comm_type=CommType.epoch_end))
                continue
            log = parse_ds_log_item(line)
            if log is None:
                continue
            if "comm_type" in log:
                log_item = LogItem(
                    comm_type=log["comm_type"],
                    comm_group=log.get("group", CommGroup.dp_group),
                    msg_size=log["msg_size"],
                )
                log_item._elapsed_time = log.get("elapsed_time", -1)
                log_item.algbw, log_item.busbw = log.get("algbw", -1), log.get(
                    "busbw", -1
                )
                comm_log.add_comm_log(log_item)
    return comm_log