extract_metrics.py (157 lines of code) (raw):
import re
import csv
import glob
import os
import argparse
import numpy as np
def parse_folder_name(folder_name):
dp = re.search(r'dp(\d+)', folder_name)
tp = re.search(r'tp(\d+)', folder_name)
pp = re.search(r'pp(\d+)', folder_name)
mbs = re.search(r'mbs(\d+)', folder_name)
ga = re.search(r'ga(\d+)', folder_name)
sl = re.search(r'sl(\d+)', folder_name)
return {
'dp': int(dp.group(1)) if dp else None,
'tp': int(tp.group(1)) if tp else None,
'pp': int(pp.group(1)) if pp else None,
'micro_batch_size': int(mbs.group(1)) if mbs else None,
'grad_acc': int(ga.group(1)) if ga else None,
'seq_len': int(sl.group(1)) if sl else None
}
def from_readable_format(formatted_str):
if not isinstance(formatted_str, str):
return formatted_str
# Remove any whitespace and convert to upper case for consistency
formatted_str = formatted_str.strip().upper()
# If it's just a number without suffix, return float
try:
return float(formatted_str)
except ValueError:
pass
# Define multipliers
multipliers = {
'T': 1e12,
'B': 1e9,
'M': 1e6,
'K': 1e3
}
# Extract number and suffix
number = float(formatted_str[:-1])
suffix = formatted_str[-1]
if suffix in multipliers:
return number * multipliers[suffix]
else:
raise ValueError(f"Unknown suffix: {suffix}")
def parse_log_line(line):
tokens_s_gpu_match = re.search(r'Tokens/s/GPU:\s*([\d.]+[KMBT]?)', line)
mfu_match = re.search(r'MFU:\s+(\d+\.\d+)%', line)
mfu_value, tokens_value = None, None
if mfu_match:
mfu_value = mfu_match.group(1)
mfu_value = float(mfu_value)
if tokens_s_gpu_match:
tokens_value = tokens_s_gpu_match.group(1)
return mfu_value, from_readable_format(tokens_value)
def process_file(filepath):
tokens_s_gpu_values = []
mfu_values = []
with open(filepath, 'r') as f:
for line in f:
if re.search(r'\[default\d+\]:\[rank \d+\]', line):
mfu_value, tokens_s_gpu_value = parse_log_line(line)
if tokens_s_gpu_value is not None:
tokens_s_gpu_values.append(tokens_s_gpu_value)
if mfu_value is not None:
mfu_values.append(mfu_value)
#NOTE: skip 3 first beginning (warmup)
if len(tokens_s_gpu_values) < 3 and len(mfu_values) < 3:
print(f"Warning: Not enough data points for {filepath}")
return None, None
tokens_s_gpu = int(round(np.mean(tokens_s_gpu_values[3:]))) if tokens_s_gpu_values else None
mfu = int(round(np.mean(mfu_values[3:]))) if mfu_values else None
return mfu, tokens_s_gpu
def write_csv(data, output_filepath):
if not data:
return
fieldnames = ['run_name', 'status', 'dp', 'tp', 'pp', 'micro_batch_size', 'grad_acc', 'seq_len', 'avg_tokens_s_gpu', 'avg_mfu']
with open(output_filepath, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(data)
def read_status(status_file):
try:
with open(status_file, 'r') as f:
return f.read().strip()
except:
return None
def create_subdirectory_metrics(input_folder):
"""Create metrics.csv files in each subdirectory"""
pattern = os.path.join(input_folder, '**/*.out')
out_files = glob.glob(pattern, recursive=True)
print(f"Found {len(out_files)} .out files")
processed_dirs = []
for file_path in out_files:
dir_path = os.path.dirname(file_path)
dir_name = os.path.basename(dir_path)
output_csv = os.path.join(dir_path, 'metrics.csv')
params = parse_folder_name(dir_name)
print(f"Processing {file_path}...")
avg_mfu, avg_tokens_s_gpu = process_file(file_path)
status = read_status(os.path.join(dir_path, 'status.txt'))
params['run_name'] = dir_name
write_csv(params, output_csv)
if status is not None:
params['status'] = status
write_csv(params, output_csv)
if avg_tokens_s_gpu is not None:
params['avg_tokens_s_gpu'] = avg_tokens_s_gpu
write_csv(params, output_csv)
processed_dirs.append(dir_path)
print(f"Processed {file_path} -> Created metrics.csv")
if avg_mfu is not None:
params['avg_mfu'] = avg_mfu
write_csv(params, output_csv)
processed_dirs.append(dir_path)
print(f"Processed {file_path} -> Created metrics.csv")
return processed_dirs
def aggregate_metrics(input_folder):
"""Create global_metrics.csv from all subdirectory metrics"""
top_level_dir = glob.glob(input_folder + '/*')
for top_dir_path in top_level_dir:
subdirs = glob.glob(top_dir_path + '/*')
aggregated_data = []
for subdir_path in subdirs:
metrics_file = os.path.join(subdir_path, 'metrics.csv')
status_file = os.path.join(subdir_path, 'status.txt')
folder_name = os.path.basename(subdir_path)
data = {
'run_name': folder_name,
'status': read_status(status_file),
**parse_folder_name(folder_name) # Unpack the parsed parameters
}
# If metrics.csv exists, read the avg_tokens_s_gpu from it
if os.path.exists(metrics_file):
try:
with open(metrics_file, 'r') as f:
reader = csv.DictReader(f)
metrics_data = next(reader)
data['avg_tokens_s_gpu'] = int(metrics_data['avg_tokens_s_gpu'])
data['avg_mfu'] = int(metrics_data['avg_mfu'])
except:
data['avg_tokens_s_gpu'] = -1
data['avg_mfu'] = -1
else:
data['avg_tokens_s_gpu'] = -1
data['avg_mfu'] = -1
aggregated_data.append(data)
# Write global metrics file
output_file = os.path.join(top_dir_path, 'global_metrics.csv')
fieldnames = ['run_name', 'status', 'dp', 'tp', 'pp', 'micro_batch_size',
'grad_acc', 'seq_len', 'avg_tokens_s_gpu', 'avg_mfu']
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(aggregated_data)
print(f"Created global_metrics.csv with {len(aggregated_data)} entries")
def main():
parser = argparse.ArgumentParser(description='Process log files and create metrics CSVs')
parser.add_argument('input_folder', help='Path to the top-level folder containing experiment subfolders')
args = parser.parse_args()
# Step 1: Create metrics.csv in each subdirectory
print("Creating individual metrics.csv files...")
create_subdirectory_metrics(args.input_folder)
# Step 2: Create global_metrics.csv
print("\nAggregating metrics into global_metrics.csv...")
aggregate_metrics(args.input_folder)
if __name__ == "__main__":
main()