in utils/trigger_training.py [0:0]
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
# Preserves whitespace in the help text.
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("--config", type=Path, required=True, help="Path the config")
parser.add_argument(
"--branch",
type=str,
required=False,
help="The name of the branch, defaults to the current branch",
)
parser.add_argument(
"--force",
action="store_true",
help="Skip the checks for the branch being up to date",
)
parser.add_argument(
"--no_interactive",
action="store_true",
help="Skip the confirmation",
)
args = parser.parse_args()
branch = args.branch
validate_taskcluster_credentials()
if branch:
print(f"Using --branch: {branch}")
else:
branch = run(["git", "branch", "--show-current"])
print(f"Using current branch: {branch}")
if branch != "main" and not branch.startswith("dev") and not branch.startswith("release"):
print(f'The git branch "{branch}" must be "main", or start with "dev" or "release"')
sys.exit(1)
if check_if_pushed(branch):
print(f"Branch '{branch}' is up to date with origin.")
elif args.force:
print(
f"Branch '{branch}' is not fully pushed to origin, bypassing this check because of --force."
)
else:
print(
f"Error: Branch '{branch}' is not fully pushed to origin. Use --force or push your changes."
)
sys.exit(1)
if branch != "main" and not branch.startswith("dev") and not branch.startswith("release"):
print(
f"Branch must be `main` or start with `dev` or `release` for training to run. Detected branch was {branch}"
)
timeout = 20
while True:
decision_task = get_decision_task_push(branch)
if decision_task:
if decision_task.status == "completed" and decision_task.conclusion == "success":
# The decision task is completed.
break
elif decision_task.status == "queued":
print(f"Decision task is queued, trying again in {timeout} seconds")
elif decision_task.status == "in_progress":
print(f"Decision task is in progress, trying again in {timeout} seconds")
else:
# The task failed.
print(
f'Decision task is "{decision_task.status}" with the conclusion "{decision_task.conclusion}"'
)
sys.exit(1)
else:
print(f"Decision task is not available, trying again in {timeout} seconds")
sleep(timeout)
decision_task_id = get_task_id_from_url(decision_task.details_url)
with args.config.open() as file:
config: dict = yaml.safe_load(file)
log_config_info(args.config, config)
action_task_id = trigger_training(decision_task_id, config)
if action_task_id:
write_to_log(args.config, config, action_task_id, branch)