taskcluster/scripts/pipeline/train_taskcluster.py (107 lines of code) (raw):
#!/usr/bin/env python3
import logging
import os
import os.path
import requests
import subprocess
import sys
TRAINING_SCRIPT = os.path.join(os.path.dirname(__file__), "train-taskcluster.sh")
CONTINUATION_ARTIFACTS = {
"config.opustrainer.yml",
"config.opustrainer.yml.state",
"devset.out",
"model.npz",
"model.npz.best-bleu-detok.npz",
"model.npz.best-bleu-detok.npz.decoder.yml",
"model.npz.best-ce-mean-words.npz",
"model.npz.best-ce-mean-words.npz.decoder.yml",
"model.npz.best-chrf.npz",
"model.npz.best-chrf.npz.decoder.yml",
"model.npz.decoder.yml",
"model.npz.optimizer.npz",
"model.npz.progress.yml",
"model.npz.yml",
"opustrainer.log",
"train.log",
"valid.log",
# + vocab*.spm artifacts which are determined dynamically
}
ARTIFACTS_URL = "{root_url}/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts"
ARTIFACT_URL = "{root_url}/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{artifact_name}"
# TODO: consolidate everything in train.py or at least do not rely on the argument names and the number of them in the Taskcluster part
# TODO: https://github.com/mozilla/translations/issues/607
# The argument number where pretrained model mode is expected.
# This is 1-indexed, not 0-indexed, so it should line up with the argument
# number this is fetched in in train-taskcluster.sh
PRETRAINED_MODEL_MODE_ARG_NUMBER = 13
# Nothing special about 17...just a number plucked out of thin air that
# should be distinct enough to retry on.
DOWNLOAD_ERROR_EXIT_CODE = 17
def main(args):
logging.basicConfig(level=logging.INFO)
script_args = list(args)
src = args[2]
trg = args[3]
task_id = os.environ["TASK_ID"]
run_id = int(os.environ["RUN_ID"])
root_url = os.environ["TASKCLUSTER_ROOT_URL"]
# Must line up with where model_dir is in `train-taskcluster.sh` while that script
# still exists.
model_dir = script_args[6]
pretrained_model_mode = None
if len(args) >= PRETRAINED_MODEL_MODE_ARG_NUMBER:
pretrained_model_mode = script_args[PRETRAINED_MODEL_MODE_ARG_NUMBER - 1]
if not os.path.exists(model_dir):
os.makedirs(model_dir)
if run_id > 0:
logging.info("run_id > 0, attempting to resume training from an earlier run...")
prev_run_id = run_id - 1
while prev_run_id >= 0:
try:
resp = requests.get(
ARTIFACTS_URL.format(root_url=root_url, task_id=task_id, run_id=prev_run_id)
)
resp.raise_for_status()
except Exception:
logging.exception("Caught exception, exiting with distinct code...")
sys.exit(DOWNLOAD_ERROR_EXIT_CODE)
run_artifacts = set([os.path.basename(a["name"]) for a in resp.json()["artifacts"]])
resumable = True
if run_artifacts.issuperset(
CONTINUATION_ARTIFACTS.union({f"vocab.{src}.spm", f"vocab.{trg}.spm"})
) or run_artifacts.issuperset(CONTINUATION_ARTIFACTS.union({"vocab.spm"})):
logging.info(
f"Run {prev_run_id} appears to have the artifacts we need! Downloading them..."
)
else:
logging.info(f"Run {prev_run_id} is missing some necessary artifacts...")
resumable = False
if resumable:
for artifact in resp.json()["artifacts"]:
# Skip Taskcluster logs - we only care about artifacts that the training tools create.
if artifact["name"].startswith("public/log"):
continue
out_name = os.path.basename(artifact["name"])
logging.info(f"Fetching {artifact['name']}...")
r = requests.get(
ARTIFACT_URL.format(
root_url=root_url,
task_id=task_id,
run_id=prev_run_id,
artifact_name=artifact["name"],
),
stream=True,
)
if 400 <= r.status_code <= 500:
logging.exception(
f"Got 4xx error for {artifact['name']}, run {run_id} is not resumable..."
)
resumable = False
break
elif r.status_code >= 500:
logging.exception("Caught exception, exiting with distinct code...")
sys.exit(DOWNLOAD_ERROR_EXIT_CODE)
with open(os.path.join(model_dir, out_name), "wb+") as fd:
for chunk in r.iter_content(chunk_size=8192):
fd.write(chunk)
if resumable:
# We successfully downloaded all the artifacts from a previous run. Override
# the pretrained model mode and we're done!
pretrained_model_mode = "continue"
break
else:
# We weren't able to get all of the necessary artifacts; try the next previous run
prev_run_id -= 1
if pretrained_model_mode:
if len(script_args) < PRETRAINED_MODEL_MODE_ARG_NUMBER:
script_args.append(pretrained_model_mode)
else:
script_args[PRETRAINED_MODEL_MODE_ARG_NUMBER - 1] = pretrained_model_mode
subprocess.run([TRAINING_SCRIPT, *script_args], check=True)
if __name__ == "__main__":
main(sys.argv[1:])