bench_cluster/report.py (293 lines of code) (raw):
import glob
import os
import re
import csv
import json
import pandas as pd
import torch
from statistics import mean
import subprocess
def units_to_float(value):
if 'K' in value:
return float(value.replace('K', '')) * 1000
elif 'M' in value:
return float(value.replace('M', '')) * 1000000
elif 'G' in value:
return float(value.replace('G', '')) * 1000000000
else:
return float(value)
def parse_logs(inp_dir, cluster: str):
folders = [os.path.abspath(folder) for folder in glob.glob(os.path.join(inp_dir, "**"), recursive=True) if os.path.isdir(folder)]
completed_logs_path = []
for folder in folders:
status_file = os.path.join(folder, "status.txt")
if os.path.exists(status_file):
with open(status_file, "r") as f:
status = f.read().strip()
if status == "completed":
log_files = glob.glob(os.path.join(folder, "log_*.out"))
if log_files:
completed_logs_path.append(log_files[0])
metrics_dict = {}
for file_path in completed_logs_path:
metrics = {}
current_iteration = None
with open(file_path, 'r') as file:
for line in file:
if cluster == "hf":
match_iteration = re.search(
r'\[default\d+\]:\S+ \S+ \[INFO\|DP=\d+\|PP=\d+\|TP=\d+\|\S+\]: iteration: (\d+) / \d+ \| ' \
r'consumed_tokens: ([\d\.KM]+) \| elapsed_time_per_iteration_ms: ([\d\.KM]+) \| ' \
r'tokens_per_sec: ([\d\.KM]+) \| tokens_per_sec_per_gpu: ([\d\.KM]+) \| ' \
r'global_batch_size: ([\d\.KM]+) \| lm_loss: ([\d\.]+) \| lr: ([\de\.-]+) \| ' \
r'model_tflops_per_gpu: ([\d\.]+) \| hardware_tflops_per_gpu: ([\d\.]+) \| ' \
r'grad_norm: ([\d\.]+).*', line
)
if match_iteration:
current_iteration = int(match_iteration.group(1))
metrics[current_iteration] = {
'iteration': current_iteration,
'consumed_tokens': units_to_float(match_iteration.group(2)),
'elapsed_time_per_iteration_ms': units_to_float(match_iteration.group(3)),
'tokens_per_sec': units_to_float(match_iteration.group(4)),
'tokens_per_sec_per_gpu': units_to_float(match_iteration.group(5)),
'global_batch_size': units_to_float(match_iteration.group(6)),
'lm_loss': float(match_iteration.group(7)),
'lr': float(match_iteration.group(8)),
'model_tflops_per_gpu': float(match_iteration.group(9)),
'hardware_tflops_per_gpu': float(match_iteration.group(10)),
'grad_norm': float(match_iteration.group(11))
}
match_memory = re.search(
r'\[default\d\]:\S+ \S+ \[INFO\|DP=\d\|PP=\d\|TP=\d\|\S+\]: Memory usage: ([\d\.]+)MiB\. '
r'Peak allocated ([\d\.]+)MiB\. Peak reserved: ([\d\.]+)MiB', line)
if match_memory and current_iteration is not None:
if current_iteration in metrics:
metrics[current_iteration].update({
'memory_usage_MiB': float(match_memory.group(1)),
'peak_allocated_MiB': float(match_memory.group(2)),
'peak_reserved_MiB': float(match_memory.group(3))
})
elif cluster == "swiss-ai":
match_iteration = re.search(
r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}) \[INFO\|DP=(\d+)\|PP=(\d+)\|TP=(\d+)\|(nid\d+)\]: '
r'iteration: (\d+) / \d+ \| '
r'consumed_tokens: ([\d\.KM]+) \| '
r'elapsed_time_per_iteration_ms: ([\d\.KM]+) \| '
r'tokens_per_sec: ([\d\.KM]+) \| '
r'tokens_per_sec_per_gpu: ([\d\.KM]+) \| '
r'global_batch_size: ([\d\.KM]+) \| '
r'lm_loss: ([\d\.]+) \| '
r'lr: ([\de\.-]+) \| '
r'model_tflops_per_gpu: ([\d\.]+) \| '
r'hardware_tflops_per_gpu: ([\d\.]+) \| '
r'grad_norm: ([\d\.]+).*', line
)
if match_iteration:
current_iteration = int(match_iteration.group(6))
metrics[current_iteration] = {
'iteration': current_iteration,
'consumed_tokens': units_to_float(match_iteration.group(7)),
'elapsed_time_per_iteration_ms': units_to_float(match_iteration.group(8)),
'tokens_per_sec': units_to_float(match_iteration.group(9)),
'tokens_per_sec_per_gpu': units_to_float(match_iteration.group(10)),
'global_batch_size': units_to_float(match_iteration.group(11)),
'lm_loss': float(match_iteration.group(12)),
'lr': float(match_iteration.group(13)),
'model_tflops_per_gpu': float(match_iteration.group(14)),
'hardware_tflops_per_gpu': float(match_iteration.group(15)),
'grad_norm': float(match_iteration.group(16)),
}
match_memory = re.search(
r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}) \[INFO\|DP=(\d+)\|PP=(\d+)\|TP=(\d+)\|(nid\d+)\]:\s+'
r'Memory usage: ([\d\.]+)MiB\. '
r'Peak allocated ([\d\.]+)MiB\. Peak reserved: ([\d\.]+)MiB',
line
)
if match_memory and current_iteration is not None:
if current_iteration in metrics:
metrics[current_iteration].update({
'memory_usage_MiB': float(match_memory.group(6)),
'peak_allocated_MiB': float(match_memory.group(7)),
'peak_reserved_MiB': float(match_memory.group(8))
})
metrics_dict[file_path] = list(metrics.values())
# Save metrics to csv files
for file_path, data in metrics_dict.items():
base_folder = os.path.dirname(file_path)
if data:
job_id = os.path.basename(file_path).split("_")[1].split(".")[0]
csv_path = os.path.join(base_folder, f"log_metrics_{job_id}.csv")
with open(csv_path, 'w', newline='') as output_file:
fieldnames = data[0].keys()
dict_writer = csv.DictWriter(output_file, fieldnames=fieldnames)
dict_writer.writeheader()
dict_writer.writerows(data)
print(f"Saved {len(metrics_dict)} csv files over {len(completed_logs_path)} completed logs")
def parse_profiler(inp_dir):
# Search for files ending in .json in the inp_dir and its subdirectories
file_paths = glob.glob(os.path.join(inp_dir, "**", "*.json"), recursive=True)
if not file_paths:
raise ValueError(f"No .json file found in {inp_dir}")
all_forward_durations = []
all_backward_durations = []
def _format_duration(duration):
ms = duration // 1000
us = duration % 1000
return f"{ms}ms {us}μs"
for file_path in file_paths:
print(f"Processing file: {file_path}")
with open(file_path, 'r') as f:
trace_data = json.load(f)
forward_durations = []
backward_durations = []
for event in trace_data['traceEvents']:
if 'name' in event and 'dur' in event:
if "forward" in event['name'].lower():
forward_durations.append(event['dur'])
elif "backward" in event['name'].lower():
backward_durations.append(event['dur'])
if forward_durations:
all_forward_durations.extend(forward_durations)
if backward_durations:
all_backward_durations.extend(backward_durations)
# Write the mean forward and backward durations to a csv file
pattern = re.compile(r'dp-\d+_tp-\d+_pp-\d+_mbz-\d+')
matching_index = next((i for i, part in enumerate(file_path.split("/")) if pattern.match(part)), None)
if matching_index is None:
raise ValueError(f"Could not find the specified pattern in {file_paths[0]}")
assert matching_index < len(file_path.split("/")) - 1, "Matching index is out of bounds"
output_file = "/".join(file_path.split("/")[:matching_index + 1]) + "/profiler.csv"
if all_forward_durations or all_backward_durations:
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["forward", "backward"])
writer.writerow([
_format_duration(int(mean(all_forward_durations))) if all_forward_durations else "N/A",
_format_duration(int(mean(all_backward_durations))) if all_backward_durations else "N/A"
])
print(f"Results written to {output_file}")
else:
print("No forward or backward durations found in any file.")
def parse_network(inp_dir):
file_paths = glob.glob(os.path.join(inp_dir, "*.out"))
if not file_paths:
raise ValueError(f"No log file found in {inp_dir}")
primitives = ['all_gather', 'all_reduce', 'all_to_all', 'broadcast', 'p2p']
headers = ['Primitive', 'Size (Bytes)', 'Description', 'Duration', 'Throughput (Gbps)', 'BusBW (Gbps)']
for file_path in file_paths:
with open(file_path, 'r') as file:
input_text = file.read()
data = []
for primitive in primitives:
pattern = rf"---- Performance of {primitive}.*?Size \(Bytes\).*?(\d+\.?\d*\s+[GMK]?B)\s+(\S+)\s+(\d+\.?\d*\s+ms)\s+(\d+\.?\d*)\s+(\d+\.?\d*)"
match = re.search(pattern, input_text, re.DOTALL)
if match:
size, description, duration, throughput, busbw = match.groups()
data.append([primitive, size, description, duration, throughput, busbw])
output_file = os.path.splitext(file_path)[0] + '.csv'
with open(output_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(headers)
writer.writerows(data)
print(f"Data from {file_path} has been written to {output_file}")
# https://github.com/stanford-cs336/spring2024-lectures/blob/main/lecture_02.py#L919
def get_promised_flop_per_sec(dtype: torch.dtype) -> float:
"""Return the peak FLOP/s for the GPU operating on `dtype`."""
# Run nvidia-smi command and capture output
try:
result = subprocess.run(['nvidia-smi', '--query-gpu=name', '--format=csv,noheader'],
capture_output=True, text=True, check=True)
gpu_name = result.stdout.strip()
except subprocess.CalledProcessError:
raise RuntimeError("Failed to run nvidia-smi. Make sure it's installed and accessible.")
except FileNotFoundError:
raise RuntimeError("nvidia-smi command not found. Make sure NVIDIA drivers are installed.")
# Extract GPU model (they are exponent 12)
if "A100" in gpu_name:
if dtype == torch.float32:
return 19.5 # 19.5 TFLOP/s
if dtype in (torch.bfloat16, torch.float16):
return 312 # 312 TFLOP/s
elif "H100" in gpu_name or "GH200" in gpu_name:
if dtype == torch.float32:
return 67.5 # 67.5 TFLOP/s
if dtype in (torch.bfloat16, torch.float16):
return (1979 / 2) # 989.5 TFLOP/s (half of 1979 for dense operations)
else:
raise ValueError(f"Unsupported GPU model: {gpu_name}")
raise ValueError(f"Unknown dtype: {dtype}")
def create_global_summary(inp_dir, cluster = "hf"):
if cluster == "hf":
max_gpus_per_node = 8
elif cluster == "swiss-ai":
max_gpus_per_node = 4
folders_path = glob.glob(os.path.join(inp_dir, '*/'))
file_paths = glob.glob(os.path.join(inp_dir, "**", "*.csv"), recursive=True)
if not file_paths:
raise ValueError(f"No .csv file found in {inp_dir}")
log_metrics_csv = [file for file in file_paths if re.search(r"log_metrics_\d+\.csv", file)]
profiler_csv = [file for file in file_paths if "profiler.csv" in file]
summary_results_pd = pd.DataFrame(columns=["model", "run_name", "status", "nnodes", "dp", "tp", "pp", "batch_accumulation_per_replica", "micro_batch_size", "tok/s/gpu", "mfu", "forward", "backward"])
summary_results_pd["status"] = summary_results_pd["status"].astype(str)
summary_results_pd["forward"] = summary_results_pd["forward"].astype(str)
summary_results_pd["backward"] = summary_results_pd["backward"].astype(str)
# Create run_name column in the summary_results_pd with folder_paths
for folder in folders_path:
components = os.path.normpath(folder).split("/")
model = next((c for c in components if 'llama' in c.lower()), None)
run_name = next((c for c in components if c.startswith('dp')), None)
dp, tp, pp, micro_batch_size, batch_accumulation_per_replica = re.findall(r'\d+', run_name)
dp, tp, pp = int(dp), int(tp), int(pp)
world_size = dp * tp * pp
summary_results_pd.loc[len(summary_results_pd)] = {
"model": model,
"run_name": run_name,
"status": str(""),
"nnodes": max(1, world_size // max_gpus_per_node),
"dp": dp,
"tp": tp,
"pp": pp,
"batch_accumulation_per_replica": batch_accumulation_per_replica,
"micro_batch_size": micro_batch_size,
"tok/s/gpu": -1,
"mfu": -1,
"memory": -1,
"forward": str(""),
"backward": str(""),
}
log_metrics_dfs = {}
for file in log_metrics_csv:
run_name = file.split("/")[-2]
log_metrics_dfs[run_name] = pd.read_csv(file)
profiler_dfs = {}
for file in profiler_csv:
run_name = file.split("/")[-2]
profiler_dfs[run_name] = pd.read_csv(file)
for run_name in summary_results_pd["run_name"]:
# Get the associated row in the summary_results csv
index = summary_results_pd[summary_results_pd["run_name"] == run_name].index[0]
# Status
status_file = os.path.join(inp_dir, run_name, "status.txt")
if os.path.exists(status_file):
with open(status_file, "r") as f:
status = f.read().strip()
summary_results_pd.loc[index, "status"] = status
if summary_results_pd.loc[index, "status"] in ["timeout", "oom", "fail", "pending", "running"]:
continue
if run_name not in log_metrics_dfs:
print(f"Skipping {run_name} as it does not have log metrics csv file")
continue
skip_profiling_steps = 0 if run_name not in profiler_dfs else 7
# Tokens per sec per gpu (exclude the first 6 iterations as they are part of profiling)
summary_results_pd.loc[index, "tok/s/gpu"] = log_metrics_dfs[run_name]["tokens_per_sec_per_gpu"][skip_profiling_steps:].astype(float).mean()
# MFU (bf16) (exclude the first 3 iterations as they are profiler warmup)
summary_results_pd.loc[index, "mfu"] = (log_metrics_dfs[run_name]["model_tflops_per_gpu"][skip_profiling_steps:].astype(int).mean() / get_promised_flop_per_sec(dtype=torch.bfloat16)) * 100
if run_name not in profiler_dfs:
print(f"Skipping profiler part for {run_name} as it does not have profiler.csv")
continue
# Forward
summary_results_pd.loc[index, "forward"] = profiler_dfs[run_name]["forward"].values[0]
# Backward
summary_results_pd.loc[index, "backward"] = profiler_dfs[run_name]["backward"].values[0]
num_gpus = folders_path[0].split("/")[-3]
path = os.path.join(inp_dir, num_gpus + "_global_summary.csv")
summary_results_pd.to_csv(path, index=False)
print(f"Create {path} with new metrics")
def report(inp_dir, cluster, is_profiler=False, is_network=False, is_logs=False, global_summary=False):
if is_logs:
parse_logs(inp_dir, cluster)
elif is_profiler:
parse_profiler(inp_dir)
elif is_network:
parse_network(inp_dir)
elif global_summary:
create_global_summary(inp_dir, cluster)
else:
raise ValueError("Please specify the type of report to generate")