def boot()

in tracking/translations_parser/cli/taskcluster.py [0:0]


def boot() -> None:
    args = get_args()

    if args.loglevel:
        logger.setLevel(args.loglevel)

    args.output_dir.mkdir(parents=True, exist_ok=True)

    lines: TextIOWrapper | Iterator[str]
    if args.input_file is None and args.from_stream is False:
        raise Exception("One of `--input-file` or `--from-stream` must be set.")
    if args.from_stream:
        lines = sys.stdin
    else:
        with args.input_file.open("r") as f:
            lines = (line.strip() for line in f.readlines())

    # Build publisher output, CSV is always enabled, Weight & Biases upon operator choice
    publishers: list[Publisher] = [CSVExport(output_dir=args.output_dir)]
    wandb_publisher = get_wandb_publisher(
        project_name=args.wandb_project,
        group_name=args.wandb_group,
        run_name=args.wandb_run_name,
        taskcluster_secret=args.taskcluster_secret,
        logs_file=args.input_file,
        artifacts=args.wandb_artifacts,
        publication=args.wandb_publication,
    )
    if wandb_publisher:
        publishers.append(wandb_publisher)
    elif args.publish_group_logs:
        logger.warning(
            "Ignoring --publish-group-logs option as Weight & Biases publication is disabled."
        )

    # Publish experiment configuration before parsing the training logs
    if wandb_publisher and args.publish_group_logs:
        logger.info("Publishing experiment config to a 'group_logs' fake run.")
        # Retrieve experiment configuration from the task group
        task_id = os.environ.get("TASK_ID")
        if not task_id:
            raise Exception("Group logs publication can only run in taskcluster")
        task = queue.task(task_id)
        group_id = task["taskGroupId"]
        # Ensure task group is readable
        queue.getTaskGroup(group_id)
        task_group = queue.task(group_id)
        config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")
        publish_group_logs_from_tasks(
            project=wandb_publisher.project,
            group=wandb_publisher.group,
            config=config,
            suffix=suffix_from_group(group_id),
        )

    # Use log filtering when using non-stream (for uploading past experiments)
    log_filter = taskcluster_log_filter if not args.from_stream else None
    parser = TrainingParser(
        lines,
        publishers=publishers,
        log_filter=log_filter,
    )
    parser.run()