in bench_cluster/report.py [0:0]
def parse_logs(inp_dir, cluster: str):
folders = [os.path.abspath(folder) for folder in glob.glob(os.path.join(inp_dir, "**"), recursive=True) if os.path.isdir(folder)]
completed_logs_path = []
for folder in folders:
status_file = os.path.join(folder, "status.txt")
if os.path.exists(status_file):
with open(status_file, "r") as f:
status = f.read().strip()
if status == "completed":
log_files = glob.glob(os.path.join(folder, "log_*.out"))
if log_files:
completed_logs_path.append(log_files[0])
metrics_dict = {}
for file_path in completed_logs_path:
metrics = {}
current_iteration = None
with open(file_path, 'r') as file:
for line in file:
if cluster == "hf":
match_iteration = re.search(
r'\[default\d+\]:\S+ \S+ \[INFO\|DP=\d+\|PP=\d+\|TP=\d+\|\S+\]: iteration: (\d+) / \d+ \| ' \
r'consumed_tokens: ([\d\.KM]+) \| elapsed_time_per_iteration_ms: ([\d\.KM]+) \| ' \
r'tokens_per_sec: ([\d\.KM]+) \| tokens_per_sec_per_gpu: ([\d\.KM]+) \| ' \
r'global_batch_size: ([\d\.KM]+) \| lm_loss: ([\d\.]+) \| lr: ([\de\.-]+) \| ' \
r'model_tflops_per_gpu: ([\d\.]+) \| hardware_tflops_per_gpu: ([\d\.]+) \| ' \
r'grad_norm: ([\d\.]+).*', line
)
if match_iteration:
current_iteration = int(match_iteration.group(1))
metrics[current_iteration] = {
'iteration': current_iteration,
'consumed_tokens': units_to_float(match_iteration.group(2)),
'elapsed_time_per_iteration_ms': units_to_float(match_iteration.group(3)),
'tokens_per_sec': units_to_float(match_iteration.group(4)),
'tokens_per_sec_per_gpu': units_to_float(match_iteration.group(5)),
'global_batch_size': units_to_float(match_iteration.group(6)),
'lm_loss': float(match_iteration.group(7)),
'lr': float(match_iteration.group(8)),
'model_tflops_per_gpu': float(match_iteration.group(9)),
'hardware_tflops_per_gpu': float(match_iteration.group(10)),
'grad_norm': float(match_iteration.group(11))
}
match_memory = re.search(
r'\[default\d\]:\S+ \S+ \[INFO\|DP=\d\|PP=\d\|TP=\d\|\S+\]: Memory usage: ([\d\.]+)MiB\. '
r'Peak allocated ([\d\.]+)MiB\. Peak reserved: ([\d\.]+)MiB', line)
if match_memory and current_iteration is not None:
if current_iteration in metrics:
metrics[current_iteration].update({
'memory_usage_MiB': float(match_memory.group(1)),
'peak_allocated_MiB': float(match_memory.group(2)),
'peak_reserved_MiB': float(match_memory.group(3))
})
elif cluster == "swiss-ai":
match_iteration = re.search(
r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}) \[INFO\|DP=(\d+)\|PP=(\d+)\|TP=(\d+)\|(nid\d+)\]: '
r'iteration: (\d+) / \d+ \| '
r'consumed_tokens: ([\d\.KM]+) \| '
r'elapsed_time_per_iteration_ms: ([\d\.KM]+) \| '
r'tokens_per_sec: ([\d\.KM]+) \| '
r'tokens_per_sec_per_gpu: ([\d\.KM]+) \| '
r'global_batch_size: ([\d\.KM]+) \| '
r'lm_loss: ([\d\.]+) \| '
r'lr: ([\de\.-]+) \| '
r'model_tflops_per_gpu: ([\d\.]+) \| '
r'hardware_tflops_per_gpu: ([\d\.]+) \| '
r'grad_norm: ([\d\.]+).*', line
)
if match_iteration:
current_iteration = int(match_iteration.group(6))
metrics[current_iteration] = {
'iteration': current_iteration,
'consumed_tokens': units_to_float(match_iteration.group(7)),
'elapsed_time_per_iteration_ms': units_to_float(match_iteration.group(8)),
'tokens_per_sec': units_to_float(match_iteration.group(9)),
'tokens_per_sec_per_gpu': units_to_float(match_iteration.group(10)),
'global_batch_size': units_to_float(match_iteration.group(11)),
'lm_loss': float(match_iteration.group(12)),
'lr': float(match_iteration.group(13)),
'model_tflops_per_gpu': float(match_iteration.group(14)),
'hardware_tflops_per_gpu': float(match_iteration.group(15)),
'grad_norm': float(match_iteration.group(16)),
}
match_memory = re.search(
r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}) \[INFO\|DP=(\d+)\|PP=(\d+)\|TP=(\d+)\|(nid\d+)\]:\s+'
r'Memory usage: ([\d\.]+)MiB\. '
r'Peak allocated ([\d\.]+)MiB\. Peak reserved: ([\d\.]+)MiB',
line
)
if match_memory and current_iteration is not None:
if current_iteration in metrics:
metrics[current_iteration].update({
'memory_usage_MiB': float(match_memory.group(6)),
'peak_allocated_MiB': float(match_memory.group(7)),
'peak_reserved_MiB': float(match_memory.group(8))
})
metrics_dict[file_path] = list(metrics.values())
# Save metrics to csv files
for file_path, data in metrics_dict.items():
base_folder = os.path.dirname(file_path)
if data:
job_id = os.path.basename(file_path).split("_")[1].split(".")[0]
csv_path = os.path.join(base_folder, f"log_metrics_{job_id}.csv")
with open(csv_path, 'w', newline='') as output_file:
fieldnames = data[0].keys()
dict_writer = csv.DictWriter(output_file, fieldnames=fieldnames)
dict_writer.writeheader()
dict_writer.writerows(data)
print(f"Saved {len(metrics_dict)} csv files over {len(completed_logs_path)} completed logs")