taskcluster/translations_taskgraph/transforms/training_continuation.py (106 lines of code) (raw):
import requests
from taskgraph.transforms.base import TransformSequence
from urllib.parse import urljoin
import os
CONTINUE_TRAINING_ARTIFACTS = [
"devset.out",
"model.npz",
"model.npz.best-bleu-detok.npz",
"model.npz.best-bleu-detok.npz.decoder.yml",
"model.npz.best-ce-mean-words.npz",
"model.npz.best-ce-mean-words.npz.decoder.yml",
"final.model.npz.best-chrf.npz",
"model.npz.best-chrf.npz",
"final.model.npz.best-chrf.npz.decoder.yml",
"model.npz.best-chrf.npz.decoder.yml",
"model.npz.decoder.yml",
"model.npz.optimizer.npz",
"model.npz.progress.yml",
"model.npz.yml",
"train.log",
"valid.log",
# + vocab*.spm artifacts which are determined dynamically
]
INITIALIZE_MODEL_ARTIFACTS = (
"model.npz.best-bleu-detok.npz",
"model.npz.best-ce-mean-words.npz",
"final.model.npz.best-chrf.npz",
"model.npz.best-chrf.npz",
)
def get_artifact_url(url, artifact_name):
normalized_url = f"{url}/" if not url.endswith("/") else url
return urljoin(normalized_url, artifact_name)
def get_artifact_mount(url, directory, artifact_name):
artifact_url = get_artifact_url(url, artifact_name)
return {
"content": {
"url": artifact_url,
},
"file": os.path.join(directory, artifact_name),
}
def get_artifact_mounts(urls, directory, artifact_names):
for url in urls:
artifact_mounts = []
for artifact_name in artifact_names:
artifact_mounts.append(get_artifact_mount(url, directory, artifact_name))
yield artifact_mounts
def get_models_mounts(pretrained_models, src, trg):
mounts = {}
for pretrained_model in pretrained_models:
model_urls = pretrained_models[pretrained_model]["urls"]
if pretrained_models[pretrained_model]["mode"] == "init":
model_artifacts = INITIALIZE_MODEL_ARTIFACTS
else:
joint_vocab_url = get_artifact_url(model_urls[0], "vocab.spm")
src_vocab_url = get_artifact_url(model_urls[0], f"vocab.{src}.spm")
if requests.get(joint_vocab_url).status_code == 200:
print("Using a joint vocab mount")
vocab_artifacts = ["vocab.spm"]
elif requests.get(src_vocab_url).status_code == 200:
print("Using separate vocabs mounts")
vocab_artifacts = [f"vocab.{src}.spm", f"vocab.{trg}.spm"]
else:
ValueError("Vocab urls do not return code 200")
model_artifacts = CONTINUE_TRAINING_ARTIFACTS + vocab_artifacts
mount = get_artifact_mounts(model_urls, "./artifacts", model_artifacts)
mounts[pretrained_model] = mount
return mounts
transforms = TransformSequence()
@transforms.add
def add_pretrained_model_mounts(config, jobs):
pretrained_models = config.params["training_config"]["experiment"].get("pretrained-models", {})
src = config.params["training_config"]["experiment"]["src"]
trg = config.params["training_config"]["experiment"]["trg"]
pretrained_models_training_artifact_mounts = get_models_mounts(pretrained_models, src, trg)
for job in jobs:
pretrained_model_training_artifact_mounts = next(
pretrained_models_training_artifact_mounts.get(config.kind, iter((None,)))
)
if pretrained_model_training_artifact_mounts:
mounts = job["worker"].get("mounts", [])
mounts.extend(pretrained_model_training_artifact_mounts)
job["worker"]["mounts"] = mounts
job["dependencies"].pop("build-vocab")
job["fetches"].pop("build-vocab")
if pretrained_models[config.kind]["mode"] == "use":
# In use mode, no upstream dependencies of the training job are needed - the
# task simply republishes the pretrained artifacts.
job["dependencies"] = {}
job["fetches"] = {}
# We also need to adjust the caching parameters. The only thing that should influence
# the cache digest are the pretrained model parameters.
job["attributes"]["cache"]["resources"] = []
job["attributes"]["cache"]["from-parameters"] = {
p: v
for p, v in job["attributes"]["cache"]["from-parameters"].items()
if p.startswith("pretrained")
}
yield job
evaluate_stage = TransformSequence()
@evaluate_stage.add
def skip_for_pretrained_models(config, jobs):
# Find the types of pretrained models that are being used. This makes
# it easier to filter them out in the loop below.
pretrained_models = [
pretrained.split("-")[-1].replace("backwards", "backward")
for pretrained in config.params["training_config"]["experiment"]
.get("pretrained-models", {})
.keys()
]
for job in jobs:
if any([pretrained in job["attributes"]["stage"] for pretrained in pretrained_models]):
continue
yield job