def upstreams_for_mono()

in taskcluster/translations_taskgraph/transforms/find_upstreams.py [0:0]


def upstreams_for_mono(config, jobs):
    training_config = config.params.get("training_config", {})
    datasets = training_config.get("datasets", {})
    src = training_config["experiment"]["src"]
    trg = training_config["experiment"]["trg"]
    for job in jobs:
        dataset_category = job["attributes"]["dataset-category"]
        target_datasets = datasets[dataset_category]
        job.setdefault("dependencies", {})
        job.setdefault("fetches", {})
        upstreams_config = job.pop("upstreams-config")
        upstream_task_attributes = upstreams_config["upstream-task-attributes"]
        artifacts = upstreams_config["upstream-artifacts"]
        substitution_fields = upstreams_config.get("substitution-fields", [])

        for task in sorted(config.kind_dependencies_tasks.values(), key=lambda t: t.label):
            # Filter out any tasks that don't match the desired attributes.
            if any(task.attributes.get(k) != v for k, v in upstream_task_attributes.items()):
                continue

            provider = task.attributes["provider"]
            dataset = task.attributes["dataset"]
            task_dataset = f"{provider}_{dataset}"

            # Filter out any tasks that don't match a desired dataset
            if task_dataset not in target_datasets:
                continue

            if dataset_category == "mono-src":
                locale = src
            elif dataset_category == "mono-trg":
                locale = trg
            else:
                raise Exception(
                    "Don't use `find_upstreams:mono` without the `mono-src` or `mono-trg` category!"
                )

            job["dependencies"][task.label] = task.label
            job["fetches"].setdefault(task.label, [])

            subs = {
                "provider": provider,
                "dataset": dataset,
                "dataset_sanitized": sanitize_dataset_name(dataset),
                "locale": locale,
                "src_locale": src,
                "trg_locale": trg,
            }

            for field in substitution_fields:
                container, subfield = job, field
                while "." in subfield:
                    f, subfield = subfield.split(".", 1)
                    container = container[f]

                container[subfield] = substitute(container[subfield], **subs)

            for artifact in sorted(artifacts):
                job["fetches"][task.label].append(
                    {
                        "artifact": artifact.format(**subs),
                        "extract": False,
                    }
                )

            job["attributes"]["src_locale"] = src
            job["attributes"]["trg_locale"] = trg

        yield job