in jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py [0:0]
def process_event(self, event):
data = event.data
if data.get("runId") is None:
# This can happen if `deadline` was exceeded before a run could
# start. Ignore this case.
return
status = data["status"]
run = data["status"]["runs"][data["runId"]]
run_record = {
"task_id": status["taskId"],
"reason_created": run["reasonCreated"],
"reason_resolved": run["reasonResolved"],
"resolved": run["resolved"],
"run_id": data["runId"],
"scheduled": run["scheduled"],
"state": run["state"],
}
if "started" in run:
run_record["started"] = run["started"]
if "workerGroup" in run:
run_record["worker_group"] = run["workerGroup"]
if "workerId" in run:
run_record["worker_id"] = run["workerId"]
self.run_records.append(
Runs.from_dict(run_record)
)
if data["runId"] == 0:
# Only insert the task record for run 0 to avoid duplicate records.
try:
task_record = {
"scheduler_id": status["schedulerId"],
"tags": {},
"task_group_id": status["taskGroupId"],
"task_id": status["taskId"],
"task_queue_id": status["taskQueueId"],
}
# Tags can be missing if the run is in the exception state.
if tags := data.get("task", {}).get("tags"):
for key, value in tags.items():
if key := self._normalize_tag(key):
task_record["tags"][key] = value
self.task_records.append(
Tasks.from_dict(task_record)
)
except Exception:
# Don't insert the run without its corresponding task.
self.run_records.pop()
raise