taskcluster/translations_taskgraph/transforms/cached_tasks.py (99 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # This transform is largely of the upstream `cached_task` transform in Taskgraph. # It exists because there are two features that we need that are missing upstream: # - The ability to influence the cache digest from parameters. # (https://github.com/taskcluster/taskgraph/issues/391) import itertools import taskgraph from taskgraph.transforms.base import TransformSequence from taskgraph.transforms.cached_tasks import order_tasks, format_task_digest from taskgraph.util.cached_tasks import add_optimization from taskgraph.util.hash import hash_path from taskgraph.util.schema import Schema, optionally_keyed_by, resolve_keyed_by from voluptuous import ALLOW_EXTRA, Any, Required, Optional from translations_taskgraph.util.dict_helpers import deep_get transforms = TransformSequence() SCHEMA = Schema( { Required("attributes"): { Required("cache"): { Required("type"): str, Optional("resources"): optionally_keyed_by("provider", [str]), Optional("from-parameters"): { str: Any([str], str), }, }, }, }, extra=ALLOW_EXTRA, ) transforms = TransformSequence() transforms.add_validate(SCHEMA) @transforms.add def resolved_keyed_by_fields(config, jobs): for job in jobs: provider = job["attributes"].get("provider", None) resolve_keyed_by( job["attributes"]["cache"], "resources", item_name=job["description"], **{"provider": provider}, ) yield job @transforms.add def add_cache(config, jobs): for job in jobs: cache = job["attributes"]["cache"] cache_type = cache["type"] cache_resources = cache["resources"] cache_parameters = cache.get("from-parameters", {}) digest_data = [] digest_data.extend(list(itertools.chain.from_iterable(job["worker"]["command"]))) if cache_resources: for r in cache_resources: digest_data.append(hash_path(r)) if cache_parameters: for param, path in cache_parameters.items(): if isinstance(path, str): value = deep_get(config.params, path) digest_data.append(f"{param}:{value}") else: for choice in path: value = deep_get(config.params, choice) if value is not None: digest_data.append(f"{param}:{value}") break job["cache"] = { "type": cache_type, # Upstream cached tasks use "/" as a separator for different parts # of the digest. If we don't remove them, caches are busted for # anything with a "/" in its label. "name": job["label"].replace("/", "_"), "digest-data": digest_data, } yield job @transforms.add def cache_task(config, tasks): if taskgraph.fast: for task in tasks: yield task return digests = {} for task in config.kind_dependencies_tasks.values(): if "cached_task" in task.attributes: digests[task.label] = format_task_digest(task.attributes["cached_task"]) for task in order_tasks(config, tasks): cache = task.pop("cache", None) if cache is None: yield task continue dependency_digests = [] for p in task.get("dependencies", {}).values(): if p in digests: dependency_digests.append(digests[p]) else: raise Exception( "Cached task {} has uncached parent task: {}".format(task["label"], p) ) digest_data = cache["digest-data"] + sorted(dependency_digests) add_optimization( config, task, cache_type=cache["type"], cache_name=cache["name"], digest_data=digest_data, ) digests[task["label"]] = format_task_digest(task["attributes"]["cached_task"]) yield task