in tracking/translations_parser/cli/taskcluster_group.py [0:0]
def publish_task_group(group_id: str, override: bool = False) -> None:
logger.info(f"Retrieving task group {group_id}")
# Ensure task group is readable
queue.getTaskGroup(group_id)
# Read project and experiment name from task group configuration
task_group = queue.task(group_id)
config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")
# If the task group does not have a training configuration, we can skip its publication
if config is None:
logger.warning(
f"Task group {group_id} cannot be published to WandB: "
"configuration missing @ extra/action/context/input"
)
return
experiment = config["experiment"]
project_name = f'{experiment["src"]}-{experiment["trg"]}'
group_name = f'{experiment["name"]}_{group_id}'
suffix = suffix_from_group(group_id)
grouped_tasks = list_completed_tasks(group_id)
training_tasks = list_training_tasks(group_id, grouped_tasks)
metrics_tasks = list_metrics_tasks(group_id, grouped_tasks)
if not training_tasks:
logger.warning(f"Skipping task group {group_id} as it is empty")
return
logger.info(f"Processing group {group_name}")
if override:
existing_runs = list(wandb.Api().runs(project_name, filters={"group": group_name}))
for run in existing_runs:
logger.warning(f"Deleting existing run {run.display_name}.")
run.delete()
# Publish training tasks as runs
for training_task in training_tasks:
# Associate metrics to each runs (evaluate tasks that depends on the training task)
dependent_tasks = []
for eval_id, eval_task in metrics_tasks.items():
eval_label = eval_task["task"]["tags"].get("label", "")
try:
model_name = parse_task_label(eval_label).model
except ValueError:
continue
# Evaluation tasks must be a dependency of the run and match its name
if (
training_task["status"]["taskId"] in eval_task["task"]["dependencies"]
and model_name == training_task["name"]
):
dependent_tasks.append(eval_id)
metrics = sum(
[
get_metrics_from_task(metrics_tasks.pop(dependent_task_id))
for dependent_task_id in dependent_tasks
],
start=[],
)
publish_task(
project=project_name,
group=group_name,
suffix=suffix,
name=training_task["name"],
task=training_task,
metrics=metrics,
)
# Group and publish remaining metrics tasks via the logs publication
publish_group_logs_from_tasks(
project=project_name,
group=group_name,
suffix=suffix,
metrics_tasks=metrics_tasks,
config=config,
)