in bench_cluster/report.py [0:0]
def create_global_summary(inp_dir, cluster = "hf"):
if cluster == "hf":
max_gpus_per_node = 8
elif cluster == "swiss-ai":
max_gpus_per_node = 4
folders_path = glob.glob(os.path.join(inp_dir, '*/'))
file_paths = glob.glob(os.path.join(inp_dir, "**", "*.csv"), recursive=True)
if not file_paths:
raise ValueError(f"No .csv file found in {inp_dir}")
log_metrics_csv = [file for file in file_paths if re.search(r"log_metrics_\d+\.csv", file)]
profiler_csv = [file for file in file_paths if "profiler.csv" in file]
summary_results_pd = pd.DataFrame(columns=["model", "run_name", "status", "nnodes", "dp", "tp", "pp", "batch_accumulation_per_replica", "micro_batch_size", "tok/s/gpu", "mfu", "forward", "backward"])
summary_results_pd["status"] = summary_results_pd["status"].astype(str)
summary_results_pd["forward"] = summary_results_pd["forward"].astype(str)
summary_results_pd["backward"] = summary_results_pd["backward"].astype(str)
# Create run_name column in the summary_results_pd with folder_paths
for folder in folders_path:
components = os.path.normpath(folder).split("/")
model = next((c for c in components if 'llama' in c.lower()), None)
run_name = next((c for c in components if c.startswith('dp')), None)
dp, tp, pp, micro_batch_size, batch_accumulation_per_replica = re.findall(r'\d+', run_name)
dp, tp, pp = int(dp), int(tp), int(pp)
world_size = dp * tp * pp
summary_results_pd.loc[len(summary_results_pd)] = {
"model": model,
"run_name": run_name,
"status": str(""),
"nnodes": max(1, world_size // max_gpus_per_node),
"dp": dp,
"tp": tp,
"pp": pp,
"batch_accumulation_per_replica": batch_accumulation_per_replica,
"micro_batch_size": micro_batch_size,
"tok/s/gpu": -1,
"mfu": -1,
"memory": -1,
"forward": str(""),
"backward": str(""),
}
log_metrics_dfs = {}
for file in log_metrics_csv:
run_name = file.split("/")[-2]
log_metrics_dfs[run_name] = pd.read_csv(file)
profiler_dfs = {}
for file in profiler_csv:
run_name = file.split("/")[-2]
profiler_dfs[run_name] = pd.read_csv(file)
for run_name in summary_results_pd["run_name"]:
# Get the associated row in the summary_results csv
index = summary_results_pd[summary_results_pd["run_name"] == run_name].index[0]
# Status
status_file = os.path.join(inp_dir, run_name, "status.txt")
if os.path.exists(status_file):
with open(status_file, "r") as f:
status = f.read().strip()
summary_results_pd.loc[index, "status"] = status
if summary_results_pd.loc[index, "status"] in ["timeout", "oom", "fail", "pending", "running"]:
continue
if run_name not in log_metrics_dfs:
print(f"Skipping {run_name} as it does not have log metrics csv file")
continue
skip_profiling_steps = 0 if run_name not in profiler_dfs else 7
# Tokens per sec per gpu (exclude the first 6 iterations as they are part of profiling)
summary_results_pd.loc[index, "tok/s/gpu"] = log_metrics_dfs[run_name]["tokens_per_sec_per_gpu"][skip_profiling_steps:].astype(float).mean()
# MFU (bf16) (exclude the first 3 iterations as they are profiler warmup)
summary_results_pd.loc[index, "mfu"] = (log_metrics_dfs[run_name]["model_tflops_per_gpu"][skip_profiling_steps:].astype(int).mean() / get_promised_flop_per_sec(dtype=torch.bfloat16)) * 100
if run_name not in profiler_dfs:
print(f"Skipping profiler part for {run_name} as it does not have profiler.csv")
continue
# Forward
summary_results_pd.loc[index, "forward"] = profiler_dfs[run_name]["forward"].values[0]
# Backward
summary_results_pd.loc[index, "backward"] = profiler_dfs[run_name]["backward"].values[0]
num_gpus = folders_path[0].split("/")[-3]
path = os.path.join(inp_dir, num_gpus + "_global_summary.csv")
summary_results_pd.to_csv(path, index=False)
print(f"Create {path} with new metrics")