in infra/spawn_pipeline.py [0:0]
def main():
parser = argparse.ArgumentParser(description="Spawn tasks for bugbug data pipeline")
parser.add_argument("data_pipeline_json")
args = parser.parse_args()
decision_task_id = os.environ.get("TASK_ID")
options = get_taskcluster_options()
add_self = False
if decision_task_id:
add_self = True
task_group_id = decision_task_id
else:
task_group_id = taskcluster.utils.slugId()
keys = {"taskGroupId": task_group_id}
id_mapping = {}
# First pass, do the template rendering and dependencies resolution
tasks = []
with open(args.data_pipeline_json) as pipeline_file:
raw_tasks = yaml.safe_load(pipeline_file.read())
version = os.getenv("TAG", "latest")
context = {"version": version}
rendered = jsone.render(raw_tasks, context)
for task in rendered["tasks"]:
# We need to generate new unique task ids for taskcluster to be happy
# but need to identify dependencies across tasks. So we create a
# mapping between an internal ID and the generate ID
task_id = taskcluster.utils.slugId()
task_internal_id = task["ID"]
if task_internal_id in id_mapping:
raise ValueError(f"Conflicting IDs {task_internal_id}")
# Store each task ID in the id_mapping dictionary before processing dependencies.
# This way, tasks can be defined in any order.
id_mapping[task_internal_id] = task_id
for task in rendered["tasks"]:
task_internal_id = task.pop("ID")
task_id = id_mapping[task_internal_id]
for key, value in keys.items():
task[key] = value
task_payload = task["payload"]
if "env" in task_payload and task_payload["env"]:
task_payload["env"]["TAG"] = version
else:
task_payload["env"] = {
"TAG": version,
}
# Process the dependencies
new_dependencies = []
for dependency in task.get("dependencies", []):
new_dependencies.append(id_mapping[dependency])
if add_self:
new_dependencies.append(decision_task_id)
task["dependencies"] = new_dependencies
tasks.append((task_id, task))
# Now sends them
queue = taskcluster.Queue(options)
try:
for task_id, task_payload in tasks:
queue.createTask(task_id, task_payload)
logger.info(
"https://community-tc.services.mozilla.com/tasks/groups/%s", task_group_id
)
except taskcluster.exceptions.TaskclusterAuthFailure:
logger.exception("Failed to authenticate with Taskcluster")
raise