taskcluster/translations_taskgraph/transforms/find_upstreams.py (148 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# This transform sequence sets `dependencies` and `fetches` based on
# the information provided in the `upstreams-config` data in each job
# and the given parameters.
# It will through all tasks generated from `kind-dependencies` and
# set any tasks that match the following conditions as dependencies:
# - src and trg locale given match the {src,trg}_locale attributes on the upstream task
# - `upstream-task-attributes` given match their equivalents on the upstream task
# - `dataset` attribute on the upstream task is one of the datasets provided in `parameters`
# for the `dataset-category` given in the job.
#
# Additionally, fetches will be added for those tasks for each entry in `upstream-artifacts`.
#
# (It is not ideal that this transform hardcodes dataset handling, but because kinds are
# completely unaware of parameters, there's no other real way to do this.)
import copy
from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema, optionally_keyed_by, resolve_keyed_by
from voluptuous import ALLOW_EXTRA, Required, Optional
from translations_taskgraph.util.substitution import substitute
from translations_taskgraph.util.dataset_helpers import sanitize_dataset_name
SCHEMA = Schema(
{
Required("upstreams-config"): {
Required("upstream-task-attributes"): {
str: optionally_keyed_by("cleaning-type", str),
},
Required("upstream-artifacts"): [str],
},
},
extra=ALLOW_EXTRA,
)
by_locales = TransformSequence()
by_locales.add_validate(SCHEMA)
MONO = Schema(
{
Required("upstreams-config"): {
Required("upstream-task-attributes"): {
str: optionally_keyed_by("cleaning-type", str),
},
Required("upstream-artifacts"): [str],
Optional("substitution-fields"): [str],
},
},
extra=ALLOW_EXTRA,
)
mono = TransformSequence()
mono.add_validate(MONO)
def get_cleaning_type(upstreams):
candidates = set()
for upstream in upstreams:
if upstream.kind not in ("corpus-clean-parallel-bicleaner-ai", "corpus-clean-parallel"):
continue
candidates.add(upstream.attributes["cleaning-type"])
for type_ in ("corpus-clean-parallel-bicleaner-ai", "corpus-clean-parallel"):
if type_ in candidates:
return type_
raise Exception("Unable to find cleaning type!")
@by_locales.add
def resolve_keyed_by_fields(config, jobs):
for job in jobs:
upstreams_config = job["upstreams-config"]
if upstreams_config.get("upstream-task-attributes", {}).get("cleaning-type"):
cleaning_type = get_cleaning_type(config.kind_dependencies_tasks.values())
resolve_keyed_by(
upstreams_config,
"upstream-task-attributes.cleaning-type",
item_name=job["description"],
**{"cleaning-type": cleaning_type},
)
yield job
@by_locales.add
def upstreams_for_locales(config, jobs):
datasets = config.params.get("training_config", {}).get("datasets", {})
for job in jobs:
dataset_category = job["attributes"]["dataset-category"]
target_datasets = datasets[dataset_category]
upstreams_config = job.pop("upstreams-config")
artifacts = upstreams_config["upstream-artifacts"]
upstream_task_attributes = upstreams_config["upstream-task-attributes"]
subjob = copy.deepcopy(job)
subjob.setdefault("dependencies", {})
subjob.setdefault("fetches", {})
# Now that we've resolved which type of upstream task we want, we need to
# find all instances of that task for our locale pair, add them to our
# dependencies, and the necessary artifacts to our fetches.
for task in sorted(config.kind_dependencies_tasks.values(), key=lambda t: t.label):
# Filter out any tasks that don't match the desired attributes.
if any(task.attributes.get(k) != v for k, v in upstream_task_attributes.items()):
continue
provider = task.attributes["provider"]
dataset = task.attributes["dataset"]
task_dataset = f"{provider}_{dataset}"
# Filter out any tasks that don't match a desired dataset
if task_dataset not in target_datasets:
continue
subs = {
"src_locale": task.attributes["src_locale"],
"trg_locale": task.attributes["trg_locale"],
"dataset_sanitized": sanitize_dataset_name(dataset),
}
subjob["dependencies"][task.label] = task.label
subjob["fetches"].setdefault(task.label, [])
for artifact in sorted(artifacts):
subjob["fetches"][task.label].append(
{
"artifact": artifact.format(**subs),
"extract": False,
}
)
yield subjob
@mono.add
def upstreams_for_mono(config, jobs):
training_config = config.params.get("training_config", {})
datasets = training_config.get("datasets", {})
src = training_config["experiment"]["src"]
trg = training_config["experiment"]["trg"]
for job in jobs:
dataset_category = job["attributes"]["dataset-category"]
target_datasets = datasets[dataset_category]
job.setdefault("dependencies", {})
job.setdefault("fetches", {})
upstreams_config = job.pop("upstreams-config")
upstream_task_attributes = upstreams_config["upstream-task-attributes"]
artifacts = upstreams_config["upstream-artifacts"]
substitution_fields = upstreams_config.get("substitution-fields", [])
for task in sorted(config.kind_dependencies_tasks.values(), key=lambda t: t.label):
# Filter out any tasks that don't match the desired attributes.
if any(task.attributes.get(k) != v for k, v in upstream_task_attributes.items()):
continue
provider = task.attributes["provider"]
dataset = task.attributes["dataset"]
task_dataset = f"{provider}_{dataset}"
# Filter out any tasks that don't match a desired dataset
if task_dataset not in target_datasets:
continue
if dataset_category == "mono-src":
locale = src
elif dataset_category == "mono-trg":
locale = trg
else:
raise Exception(
"Don't use `find_upstreams:mono` without the `mono-src` or `mono-trg` category!"
)
job["dependencies"][task.label] = task.label
job["fetches"].setdefault(task.label, [])
subs = {
"provider": provider,
"dataset": dataset,
"dataset_sanitized": sanitize_dataset_name(dataset),
"locale": locale,
"src_locale": src,
"trg_locale": trg,
}
for field in substitution_fields:
container, subfield = job, field
while "." in subfield:
f, subfield = subfield.split(".", 1)
container = container[f]
container[subfield] = substitute(container[subfield], **subs)
for artifact in sorted(artifacts):
job["fetches"][task.label].append(
{
"artifact": artifact.format(**subs),
"extract": False,
}
)
job["attributes"]["src_locale"] = src
job["attributes"]["trg_locale"] = trg
yield job