tracking/translations_parser/cli/taskcluster.py (139 lines of code) (raw):
#!/usr/bin/env python3
"""
Extract information from Marian execution on Taskcluster.
Example with a local file:
parse_tc_logs --input-file ./tests/data/taskcluster.log
Example reading logs from a process:
./tests/data/simulate_process.py | parse_tc_logs --from-stream --verbose
Example publishing data to Weight & Biases:
parse_tc_logs --input-file ./tests/data/taskcluster.log --wandb-project <project> --wandb-group <group> --wandb-run-name <run>
"""
import argparse
import logging
import os
import sys
from collections.abc import Iterator
from io import TextIOWrapper
from pathlib import Path
import taskcluster
from translations_parser.parser import TrainingParser, logger
from translations_parser.publishers import CSVExport, Publisher
from translations_parser.utils import (
publish_group_logs_from_tasks,
suffix_from_group,
taskcluster_log_filter,
)
from translations_parser.wandb import add_wandb_arguments, get_wandb_publisher
queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Extract information from Marian execution on Task Cluster"
)
input_group = parser.add_mutually_exclusive_group()
input_group.add_argument(
"--input-file",
"-i",
help="Path to the Task Cluster log file.",
type=Path,
default=None,
)
input_group.add_argument(
"--from-stream",
"-s",
help="Read lines from stdin stream.",
action="store_true",
)
parser.add_argument(
"--output-dir",
"-o",
help="Output directory to export training and validation data as CSV.",
type=Path,
default=Path(__file__).parent.parent / "output",
)
parser.add_argument(
"--verbose",
"-v",
help="Print debug messages.",
action="store_const",
dest="loglevel",
const=logging.DEBUG,
)
parser.add_argument(
"--publish-group-logs",
help=(
"Enable publishing a group_logs fake run with the experiment configuration."
"This option requires W&B publication to be enabled, otherwise it will be ignored."
),
action="store_true",
default=False,
)
# Extend parser with Weight & Biases CLI args
add_wandb_arguments(parser)
return parser.parse_args()
def is_running_in_ci():
"""
Determine if this run is being done in CI.
"""
task_id = os.environ.get("TASK_ID")
if not task_id:
return False
logger.info(f'Fetching the experiment for task "{task_id}" to check if this is running in CI.')
queue = taskcluster.Queue({"rootUrl": os.environ["TASKCLUSTER_PROXY_URL"]})
task = queue.task(task_id)
group_id = task["taskGroupId"]
task_group = queue.task(group_id)
# e.g,. "github-pull-request", "action", "github-push"
tasks_for = task_group.get("extra", {}).get("tasks_for")
return tasks_for != "action"
def boot() -> None:
args = get_args()
if args.loglevel:
logger.setLevel(args.loglevel)
args.output_dir.mkdir(parents=True, exist_ok=True)
lines: TextIOWrapper | Iterator[str]
if args.input_file is None and args.from_stream is False:
raise Exception("One of `--input-file` or `--from-stream` must be set.")
if args.from_stream:
lines = sys.stdin
else:
with args.input_file.open("r") as f:
lines = (line.strip() for line in f.readlines())
# Build publisher output, CSV is always enabled, Weight & Biases upon operator choice
publishers: list[Publisher] = [CSVExport(output_dir=args.output_dir)]
wandb_publisher = get_wandb_publisher(
project_name=args.wandb_project,
group_name=args.wandb_group,
run_name=args.wandb_run_name,
taskcluster_secret=args.taskcluster_secret,
logs_file=args.input_file,
artifacts=args.wandb_artifacts,
publication=args.wandb_publication,
)
if wandb_publisher:
publishers.append(wandb_publisher)
elif args.publish_group_logs:
logger.warning(
"Ignoring --publish-group-logs option as Weight & Biases publication is disabled."
)
# Publish experiment configuration before parsing the training logs
if wandb_publisher and args.publish_group_logs:
logger.info("Publishing experiment config to a 'group_logs' fake run.")
# Retrieve experiment configuration from the task group
task_id = os.environ.get("TASK_ID")
if not task_id:
raise Exception("Group logs publication can only run in taskcluster")
task = queue.task(task_id)
group_id = task["taskGroupId"]
# Ensure task group is readable
queue.getTaskGroup(group_id)
task_group = queue.task(group_id)
config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")
publish_group_logs_from_tasks(
project=wandb_publisher.project,
group=wandb_publisher.group,
config=config,
suffix=suffix_from_group(group_id),
)
# Use log filtering when using non-stream (for uploading past experiments)
log_filter = taskcluster_log_filter if not args.from_stream else None
parser = TrainingParser(
lines,
publishers=publishers,
log_filter=log_filter,
)
parser.run()
def main() -> None:
"""
Entry point for the `parse_tc_logs` script.
Catch every exception when running in Taskcluster to avoid crashing real training
"""
try:
boot()
except Exception as exception:
if os.environ.get("MOZ_AUTOMATION") is None:
logger.exception("Publication failed when running locally.")
raise exception
elif is_running_in_ci():
logger.exception("Publication failed when running in CI.")
raise exception
else:
logger.exception(
"Publication failed! The error is ignored to not break training, but it should be fixed."
)
sys.exit(0)