taskcluster/local_taskgraph/date_tasks.py (146 lines of code) (raw):
from copy import deepcopy
import datetime
from urllib.request import Request, urlopen
from taskgraph.actions.registry import register_callback_action
from taskgraph.decision import taskgraph_decision
from taskgraph.parameters import extend_parameters_schema, Parameters
from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema
from voluptuous import Required, ALLOW_EXTRA
CI_INDEX_API = "https://firefox-ci-tc.services.mozilla.com/api/index/v1"
TASK_SCHEMA = Schema({
# If specified, the task will be duplicated to create date-specific tasks.
"date-tasks": {
# The number of days preceding the cron run date for which to generate tasks.
# Only applies when getting cron tasks.
"cron-days": int,
# Whether to create a task for the manual processing action.
"action-manual": bool,
# The index path to use, both to index the results and to avoid running
# a task when one has already run. The `{date}` string will be
# interpolated with the date, where `-`s will be replaced with `.`
# (e.g., 2025-01-02 will become `2025.01.02`).
"index": str,
# An environment variable name which (if specified) will be set with the date.
"env": str,
},
# If specified, dependencies corresponding to `days` tasks created by
# `date-task.cron-days` will be added to the task.
"cron-date-dependencies": [{
# The number of days preceding the cron run date for which to create
# dependencies. This should not exceed the number of days specified in
# the corresponding `date-task.cron-days`.
Required("days"): int,
# The task name.
Required("task"): str,
# Artifacts to fetch. They will end up in `fetches/cron-date-dependencies/{task}-N`.
"artifacts": [str],
}],
}, extra=ALLOW_EXTRA)
PROCESS_PINGS_MANUAL_PARAM = "process_pings_manual"
PARAMETERS_SCHEMA = {
PROCESS_PINGS_MANUAL_PARAM: {
Required("dates"): [str],
"index": bool,
"max_tasks": int,
}
}
# Transforms logic
transforms = TransformSequence()
transforms.add_validate(TASK_SCHEMA)
# Currently unused, however we'll keep it around in case we no longer want to
# use the index-search optimization.
def index_task_exists(index):
url = f"{CI_INDEX_API}/task/{index}"
request = Request(url, headers={"User-Agent": "crash-ping-ingest/1.0 daily_batches"}, method="HEAD")
try:
return urlopen(request).status == 200
except:
return False
@transforms.add
def create_date_tasks(config, tasks):
"""Duplicates a task based on the date-tasks configuration.
The original task will never be yielded if `date-tasks` is specified.
"""
today = datetime.datetime.now(datetime.UTC).date()
for task in tasks:
cfg = task.pop('date-tasks', None)
if cfg is None:
yield task
continue
days = cfg.get("cron-days")
manual = cfg.get("action-manual")
index = cfg.get("index")
env = cfg.get("env")
def set_task_date(task, datestr, add_index = True, index_search = False):
if index is not None and add_index:
task_index = index.format(date=datestr.replace("-", "."))
if index_search:
task.setdefault("optimization", {}).setdefault("index-search", []).append(task_index)
task.setdefault("routes", []).append(f"index.{task_index}")
if env is not None:
task["worker"].setdefault("env", {})[env] = datestr
if config.params["tasks_for"] == "cron" and days is not None:
# We only create tasks for complete days: `days` will immediately
# *precede* `today`.
for preceding in range(-days, 0):
date = today + datetime.timedelta(days = preceding)
new_task = deepcopy(task)
new_task["name"] += str(preceding)
set_task_date(new_task, str(date), index_search = True)
yield new_task
if (config.params["tasks_for"] == "action"
and manual is not None
and PROCESS_PINGS_MANUAL_PARAM in config.params):
manual_cfg = config.params[PROCESS_PINGS_MANUAL_PARAM]
dates = manual_cfg["dates"]
add_index = manual_cfg.get("index", True)
max_tasks = manual_cfg.get("max_tasks", 0)
deps = [None] * max_tasks if max_tasks > 0 else None
for ind, date in enumerate(dates):
new_task = deepcopy(task)
new_task["name"] += "-manual-" + date
set_task_date(new_task, date, add_index = add_index)
if deps is not None:
# Create arbitrary dependencies among the date tasks to
# avoid running too many at once (e.g., to be a good
# citizen to the symbol servers).
dep_ind = ind % len(deps);
if deps[dep_ind] is not None:
new_task.setdefault("dependencies", {})[deps[dep_ind]] = deps[dep_ind]
deps[dep_ind] = "{}-{}".format(config.kind, new_task["name"])
yield new_task
@transforms.add
def create_daily_dependencies(config, tasks):
"""Adds dependencies on tasks created by create_date_tasks.
Only applies to `cron` jobs.
"""
for task in tasks:
all_daily_deps = task.pop('cron-date-dependencies', [])
if len(all_daily_deps) == 0 or config.params["tasks_for"] != "cron":
yield task
continue
deps = task.setdefault("dependencies", {})
fetches = task.setdefault("fetches", {})
for daily_deps in all_daily_deps:
days = daily_deps["days"]
task_name = daily_deps["task"]
artifacts = daily_deps.get("artifacts", [])
for preceding in range(-days, 0):
key = f"create-daily-dependency-{task_name}{preceding}"
deps[key] = f"{task_name}{preceding}"
fetches[key] = [{"artifact": artifact, "extract": False, "dest": f"cron-date-dependencies/{task_name}{preceding}"} for artifact in artifacts]
yield task
# Action logic
extend_parameters_schema(PARAMETERS_SCHEMA)
@register_callback_action(
name='process-pings-manual',
title='Process Pings (Manual)',
symbol='ppm',
description='Manually process pings for the given days.',
order=1,
schema={
'title': 'Manual Ping Processing Options',
'description': 'Parameters to use for manual ping processing.',
'properties': {
'dates': {
'title': 'Dates',
'description': 'The dates for which to process crash pings.',
'type': 'array',
'items': {
'type': 'string',
'format': 'date',
},
},
'index': {
'title': 'Index',
'description': 'Index the processed results.',
'type': 'boolean',
'default': 'true',
},
'max_tasks': {
'title': 'Maximum Concurrent Tasks',
'description': 'The maximum number of tasks to run at once. 0 means no maximum.',
'type': 'integer',
'minimum': 0,
'default': 0,
},
},
'required': ['dates'],
'additionalProperties': False,
}
)
def process_pings_manual_action(parameters, graph_config, input, task_group_id, task_id):
parameters = dict(parameters)
parameters["tasks_for"] = "action"
parameters[PROCESS_PINGS_MANUAL_PARAM] = input
taskgraph_decision({"root": graph_config.root_dir}, Parameters(**parameters))