taskcluster/translations_taskgraph/transforms/from_datasets.py (126 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # The transform sequences in this file are responsible for "fanning out" a job # that operates on individual datasets into N jobs based on the parameters # given. By default, it will fan out into one job for each dataset in the # training config for from _all_ categories. This can be restricted by one or # more of: # - `category` to limit to the datasets in a particular category (eg: `train`) # - `provider` to limit to datasets from particular provider (eg: `flores`) # - `exclude-locales` to avoid generating jobs for given language pairs, eg: # {"src": "en", "trg": "ru"}. (This is primarily useful for tasks like # `corpus-clean-parallel-bicleaner-ai` which only work if a bicleaner pack # is available for a locale pair. # # These transform sequences will also perform string formatting in the given # `substitution-fields`. (Normally this would be done with `task-context`, but # this transform is much more aware of things like `provider` and `dataset`, # so it's simply easier to do it here for fields that need these things.) Both # transform sequences make the following variables available: # - `provider` is the dataset provider. Eg: the `opus` part of `opus_Books/v1`. # - `dataset` is the dataset name. Eg: the `Books/v1` part of `opus_Books/v1`. # - `dataset_sanitized` is the dataset name with `/` and `.` characters replaced # with an `_` to make them more suitable in filenames and URLs. # Eg: `Books_v1` from `Books/V1`. # - `src_locale` is the `src` from the training config. # - `trg_locale` is the `trg` from the tarining config. # # Note that there are two available transform sequences here: `per_dataset` # and `mono`. `mono` does everything that `per_dataset` does, but also: # - Requires a `category` of either `mono-src` or `mono-trg`. (It doesn't make # sense to use this sequence without a category, or with other ones.) # - Makes `locale` available as a substitution parameter - which will either # be set to the `src` or `trg` locale, depending on which category was used. import copy from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from voluptuous import ALLOW_EXTRA, Optional from translations_taskgraph.util.substitution import substitute from translations_taskgraph.util.dataset_helpers import sanitize_dataset_name SCHEMA = Schema( { Optional("dataset-config"): { # Fields in each `job` that need to be substituted with data # provided by this transform. Optional("substitution-fields"): [str], Optional("category"): str, Optional("provider"): str, Optional("exclude-locales"): [ { "src": str, "trg": str, }, ], }, }, extra=ALLOW_EXTRA, ) per_dataset = TransformSequence() per_dataset.add_validate(SCHEMA) mono = TransformSequence() mono.add_validate(SCHEMA) @per_dataset.add def jobs_from_datasets(config, jobs): for job in jobs: dataset_config = job.pop("dataset-config", {}) category = dataset_config.get("category", "") provider = dataset_config.get("provider", "") substitution_fields = dataset_config.get("substitution-fields", []) exclude_locales = dataset_config.get("exclude-locales", []) datasets = config.params["training_config"]["datasets"] src = config.params["training_config"]["experiment"]["src"] trg = config.params["training_config"]["experiment"]["trg"] included_datasets = set() if category: included_datasets.update(datasets[category]) else: for sets in datasets.values(): included_datasets.update(sets) if {"src": src, "trg": trg} in exclude_locales: continue for full_dataset in included_datasets: dataset_provider, dataset = full_dataset.split("_", 1) if provider and provider != dataset_provider: continue subjob = copy.deepcopy(job) subs = { "provider": dataset_provider, "dataset": full_dataset, "dataset_sanitized": sanitize_dataset_name(dataset), "src_locale": src, "trg_locale": trg, } for field in substitution_fields: container, subfield = subjob, field while "." in subfield: f, subfield = subfield.split(".", 1) container = container[f] container[subfield] = substitute(container[subfield], **subs) subjob.setdefault("attributes", {}) subjob["attributes"]["provider"] = dataset_provider subjob["attributes"]["dataset"] = dataset subjob["attributes"]["src_locale"] = src subjob["attributes"]["trg_locale"] = trg yield subjob @mono.add def jobs_for_mono_datasets(config, jobs): for job in jobs: dataset_config = job.pop("dataset-config", {}) category = dataset_config.get("category") provider = dataset_config.get("provider", "") substitution_fields = dataset_config.get("substitution-fields", []) exclude_locales = dataset_config.get("exclude-locales", []) datasets = config.params["training_config"]["datasets"] src = config.params["training_config"]["experiment"]["src"] trg = config.params["training_config"]["experiment"]["trg"] if {"src": src, "trg": trg} in exclude_locales: continue if category not in ("mono-src", "mono-trg"): raise Exception( "from_datasets:mono can only be used with mono-src and mono-trg categories" ) included_datasets = set() if category: included_datasets.update(datasets[category]) else: for sets in datasets.values(): included_datasets.update(sets) for full_dataset in included_datasets: dataset_provider, dataset = full_dataset.split("_", 1) if provider and provider != dataset_provider: continue subjob = copy.deepcopy(job) if category == "mono-src": locale = src elif category == "mono-trg": locale = trg else: raise Exception( "from_datasets:mono can only be used with mono-src and mono-trg categories" ) subs = { "provider": dataset_provider, "dataset": full_dataset, "dataset_sanitized": sanitize_dataset_name(dataset), "locale": locale, "src_locale": src, "trg_locale": trg, } for field in substitution_fields: container, subfield = subjob, field while "." in subfield: f, subfield = subfield.split(".", 1) container = container[f] container[subfield] = substitute(container[subfield], **subs) subjob.setdefault("attributes", {}) subjob["attributes"]["provider"] = dataset_provider subjob["attributes"]["dataset"] = dataset subjob["attributes"]["locale"] = locale subjob["attributes"]["src_locale"] = src subjob["attributes"]["trg_locale"] = trg yield subjob