def main()

in utils/trigger_training.py [0:0]


def main() -> None:
    parser = argparse.ArgumentParser(
        description=__doc__,
        # Preserves whitespace in the help text.
        formatter_class=argparse.RawTextHelpFormatter,
    )

    parser.add_argument("--config", type=Path, required=True, help="Path the config")
    parser.add_argument(
        "--branch",
        type=str,
        required=False,
        help="The name of the branch, defaults to the current branch",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Skip the checks for the branch being up to date",
    )
    parser.add_argument(
        "--no_interactive",
        action="store_true",
        help="Skip the confirmation",
    )

    args = parser.parse_args()
    branch = args.branch

    validate_taskcluster_credentials()

    if branch:
        print(f"Using --branch: {branch}")
    else:
        branch = run(["git", "branch", "--show-current"])
        print(f"Using current branch: {branch}")

    if branch != "main" and not branch.startswith("dev") and not branch.startswith("release"):
        print(f'The git branch "{branch}" must be "main", or start with "dev" or "release"')
        sys.exit(1)

    if check_if_pushed(branch):
        print(f"Branch '{branch}' is up to date with origin.")
    elif args.force:
        print(
            f"Branch '{branch}' is not fully pushed to origin, bypassing this check because of --force."
        )
    else:
        print(
            f"Error: Branch '{branch}' is not fully pushed to origin. Use --force or push your changes."
        )
        sys.exit(1)

    if branch != "main" and not branch.startswith("dev") and not branch.startswith("release"):
        print(
            f"Branch must be `main` or start with `dev` or `release` for training to run. Detected branch was {branch}"
        )

    timeout = 20
    while True:
        decision_task = get_decision_task_push(branch)

        if decision_task:
            if decision_task.status == "completed" and decision_task.conclusion == "success":
                # The decision task is completed.
                break
            elif decision_task.status == "queued":
                print(f"Decision task is queued, trying again in {timeout} seconds")
            elif decision_task.status == "in_progress":
                print(f"Decision task is in progress, trying again in {timeout} seconds")
            else:
                # The task failed.
                print(
                    f'Decision task is "{decision_task.status}" with the conclusion "{decision_task.conclusion}"'
                )
                sys.exit(1)
        else:
            print(f"Decision task is not available, trying again in {timeout} seconds")

        sleep(timeout)

    decision_task_id = get_task_id_from_url(decision_task.details_url)

    with args.config.open() as file:
        config: dict = yaml.safe_load(file)

    log_config_info(args.config, config)
    action_task_id = trigger_training(decision_task_id, config)
    if action_task_id:
        write_to_log(args.config, config, action_task_id, branch)