tracking/translations_parser/cli/experiments.py (138 lines of code) (raw):
#!/usr/bin/env python3
"""
Publish multiple experiments to Weight & Biases.
Example:
parse_experiment_dir -d ./tests/data/experiments
"""
import argparse
import logging
import os
from enum import Enum
from itertools import groupby
from pathlib import Path
from translations_parser.data import Metric
from translations_parser.parser import TrainingParser
from translations_parser.publishers import WandB
from translations_parser.utils import parse_task_label, parse_gcp_metric
logger = logging.getLogger(__name__)
class ExperimentMode(Enum):
SNAKEMAKE = "snakemake"
TASKCLUSTER = "taskcluster"
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Publish multiple experiments to Weight & Biases")
parser.add_argument(
"--mode",
"-m",
help="Mode to publish experiments.",
type=ExperimentMode,
choices=ExperimentMode,
metavar=[e.value for e in ExperimentMode],
required=True,
)
parser.add_argument(
"--directory",
"-d",
help="Path to the experiments directory.",
type=Path,
default=Path(Path(os.getcwd())),
)
return parser.parse_args()
def parse_experiment(
*,
project: str,
group: str,
name: str,
suffix: str,
logs_file: Path,
metrics_dir: Path | None = None,
mode=ExperimentMode,
) -> None:
"""
Parse logs from a Taskcluster dump and publish data to W&B.
If a metrics directory is set, initially read and publish each `.metrics` values.
"""
metrics = []
if metrics_dir:
for metrics_file in metrics_dir.glob("*.metrics"):
try:
metric_attrs = parse_gcp_metric(metrics_file.stem)
except ValueError:
logger.error(f"Error parsing metric from GCP: {metrics_file.stem}. Skipping.")
else:
metrics.append(
Metric.from_file(
metrics_file,
importer=metric_attrs.importer,
dataset=metric_attrs.dataset,
augmentation=metric_attrs.augmentation,
)
)
with logs_file.open("r") as f:
lines = (line.strip() for line in f.readlines())
parser = TrainingParser(
lines,
metrics=metrics,
publishers=[
WandB(
project=project,
group=group,
name=name,
suffix=suffix,
)
],
)
parser.run()
def main() -> None:
args = get_args()
directory = args.directory
mode = args.mode
# Ignore files with a different name than "train.log"
train_files = sorted(directory.glob("**/train.log"))
logger.info(f"Reading {len(train_files)} train.log data")
prefix = os.path.commonprefix([path.parts for path in train_files])
# Move on top of the main models (Snakemake) or logs (Taskcluster) folder
if "models" in prefix:
prefix = prefix[: prefix.index("models")]
if "logs" in prefix:
prefix = prefix[: prefix.index("logs")]
# First parent folder correspond to the run name, second one is the group
groups = groupby(train_files, lambda path: path.parent.parent)
for path, files in groups:
logger.info(f"Parsing folder {path.resolve()}")
*_, project, group = path.parts
if mode == ExperimentMode.TASKCLUSTER:
if len(group) < 22:
logger.error(
f"Skip folder {group} as it cannot contain a task group ID (too few caracters)."
)
continue
suffix = f"_{group[-22:-17]}"
else:
# Use the full experiment name as a suffix for old Snakemake experiments
suffix = f"_{group}"
# Publish a run for each file inside that group
published_runs = []
for file in files:
try:
tag = f"train-{file.parent.name}"
name = parse_task_label(tag).model
except ValueError:
logger.error(f"Invalid tag extracted from file @{path}: {tag}")
continue
logger.info(f"Handling training task {name}")
# Also publish metric files when available
metrics_path = Path(
"/".join([*prefix, "models", project, group, "evaluation", file.parent.name])
)
metrics_dir = metrics_path if metrics_path.is_dir() else None
if metrics_dir is None:
logger.warning(f"Evaluation metrics files not found for {name}.")
try:
parse_experiment(
project=project,
group=group,
name=name,
suffix=suffix,
logs_file=file,
metrics_dir=metrics_dir,
mode=mode,
)
except Exception as e:
logger.error(f"An exception occured parsing training file {file}: {e}")
else:
published_runs.append(name)
# Try to publish related log files to the group on a last run named "group_logs"
logger.info(
f"Publishing '{project}/{group}' evaluation metrics and files (fake run 'group_logs')"
)
WandB.publish_group_logs(
logs_parent_folder=[*prefix, "logs"],
project=project,
group=group,
suffix=suffix,
existing_runs=published_runs,
snakemake=(mode == ExperimentMode.SNAKEMAKE.value),
)