taskcluster/translations_taskgraph/transforms/worker_selection.py (37 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # This transform sequence injects worker-specific environment variables # (such as those that dependent on the number and type of GPUs a worker has) # into task definitions. This avoids the need to discover this information at # runtime, or adjust in kinds when changing worker types. from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import evaluate_keyed_by transforms = TransformSequence() @transforms.add def set_worker_type(config, jobs): """Determines the general type of worker each task wants, which sometimes depends on `tasks-for`. Tasks typically will end up specifying one of the worker `aliases` from config.yml after this is evaluated, eg: b-cpu, b-largegpu-largedisk.""" training_config = config.params.get("training_config") worker_classes = training_config["taskcluster"]["worker-classes"] worker_class = worker_classes.get(config.kind, worker_classes["default"]) for job in jobs: # First, evaluate the `keyed-by` in the initial task specification from # the kind, if present. This should give us one of the keys from # `worker-configuration` in config.yml. task_worker_type = evaluate_keyed_by( job["worker-type"], job["description"], {"tasks-for": config.params["tasks_for"]}, ) # Now that we have one of the aliases, we need to resolve it to a # specific worker type, as some of those aliases have their own # `keyed-by` blocks, which may give different worker types depending # on what's in the training config. worker_alias_block = config.graph_config["local-worker-aliases"][task_worker_type].copy() job["worker-type"] = evaluate_keyed_by( worker_alias_block, task_worker_type, {"worker-class": worker_class}, ) yield job @transforms.add def inject_worker_env(config, jobs): for job in jobs: # This is called worker-type in jobs, but in reality it's an alias resolved in the graph config... worker_type = job["worker-type"] worker_config = config.graph_config["worker-configuration"].get(worker_type, {}) worker_env = worker_config.get("env", {}) if "GPUS" not in worker_env or "WORKSPACE" not in worker_env: # GPU tasks will not function correctly without these set; make this an error # before they even run. if "gpu" in worker_type: raise Exception( "GPUS and/or WORKSPACE values missing from worker env, this is probably misconfiguration." ) else: yield job continue job["worker"]["env"].update(worker_env) yield job