def publish_task_group()

in tracking/translations_parser/cli/taskcluster_group.py [0:0]


def publish_task_group(group_id: str, override: bool = False) -> None:
    logger.info(f"Retrieving task group {group_id}")

    # Ensure task group is readable
    queue.getTaskGroup(group_id)

    # Read project and experiment name from task group configuration
    task_group = queue.task(group_id)
    config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")

    # If the task group does not have a training configuration, we can skip its publication
    if config is None:
        logger.warning(
            f"Task group {group_id} cannot be published to WandB: "
            "configuration missing @ extra/action/context/input"
        )
        return

    experiment = config["experiment"]
    project_name = f'{experiment["src"]}-{experiment["trg"]}'
    group_name = f'{experiment["name"]}_{group_id}'
    suffix = suffix_from_group(group_id)

    grouped_tasks = list_completed_tasks(group_id)
    training_tasks = list_training_tasks(group_id, grouped_tasks)
    metrics_tasks = list_metrics_tasks(group_id, grouped_tasks)

    if not training_tasks:
        logger.warning(f"Skipping task group {group_id} as it is empty")
        return

    logger.info(f"Processing group {group_name}")

    if override:
        existing_runs = list(wandb.Api().runs(project_name, filters={"group": group_name}))
        for run in existing_runs:
            logger.warning(f"Deleting existing run {run.display_name}.")
            run.delete()

    # Publish training tasks as runs
    for training_task in training_tasks:
        # Associate metrics to each runs (evaluate tasks that depends on the training task)
        dependent_tasks = []
        for eval_id, eval_task in metrics_tasks.items():
            eval_label = eval_task["task"]["tags"].get("label", "")

            try:
                model_name = parse_task_label(eval_label).model
            except ValueError:
                continue

            # Evaluation tasks must be a dependency of the run and match its name
            if (
                training_task["status"]["taskId"] in eval_task["task"]["dependencies"]
                and model_name == training_task["name"]
            ):
                dependent_tasks.append(eval_id)

        metrics = sum(
            [
                get_metrics_from_task(metrics_tasks.pop(dependent_task_id))
                for dependent_task_id in dependent_tasks
            ],
            start=[],
        )

        publish_task(
            project=project_name,
            group=group_name,
            suffix=suffix,
            name=training_task["name"],
            task=training_task,
            metrics=metrics,
        )

    # Group and publish remaining metrics tasks via the logs publication
    publish_group_logs_from_tasks(
        project=project_name,
        group=group_name,
        suffix=suffix,
        metrics_tasks=metrics_tasks,
        config=config,
    )