in taskcluster/local_taskgraph/date_tasks.py [0:0]
def create_date_tasks(config, tasks):
"""Duplicates a task based on the date-tasks configuration.
The original task will never be yielded if `date-tasks` is specified.
"""
today = datetime.datetime.now(datetime.UTC).date()
for task in tasks:
cfg = task.pop('date-tasks', None)
if cfg is None:
yield task
continue
days = cfg.get("cron-days")
manual = cfg.get("action-manual")
index = cfg.get("index")
env = cfg.get("env")
def set_task_date(task, datestr, add_index = True, index_search = False):
if index is not None and add_index:
task_index = index.format(date=datestr.replace("-", "."))
if index_search:
task.setdefault("optimization", {}).setdefault("index-search", []).append(task_index)
task.setdefault("routes", []).append(f"index.{task_index}")
if env is not None:
task["worker"].setdefault("env", {})[env] = datestr
if config.params["tasks_for"] == "cron" and days is not None:
# We only create tasks for complete days: `days` will immediately
# *precede* `today`.
for preceding in range(-days, 0):
date = today + datetime.timedelta(days = preceding)
new_task = deepcopy(task)
new_task["name"] += str(preceding)
set_task_date(new_task, str(date), index_search = True)
yield new_task
if (config.params["tasks_for"] == "action"
and manual is not None
and PROCESS_PINGS_MANUAL_PARAM in config.params):
manual_cfg = config.params[PROCESS_PINGS_MANUAL_PARAM]
dates = manual_cfg["dates"]
add_index = manual_cfg.get("index", True)
max_tasks = manual_cfg.get("max_tasks", 0)
deps = [None] * max_tasks if max_tasks > 0 else None
for ind, date in enumerate(dates):
new_task = deepcopy(task)
new_task["name"] += "-manual-" + date
set_task_date(new_task, date, add_index = add_index)
if deps is not None:
# Create arbitrary dependencies among the date tasks to
# avoid running too many at once (e.g., to be a good
# citizen to the symbol servers).
dep_ind = ind % len(deps);
if deps[dep_ind] is not None:
new_task.setdefault("dependencies", {})[deps[dep_ind]] = deps[dep_ind]
deps[dep_ind] = "{}-{}".format(config.kind, new_task["name"])
yield new_task