def process_event()

in jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py [0:0]


    def process_event(self, event):
        data = event.data

        if data.get("runId") is None:
            # This can happen if `deadline` was exceeded before a run could
            # start. Ignore this case.
            return

        status = data["status"]
        run = data["status"]["runs"][data["runId"]]
        run_record = {
            "task_id": status["taskId"],
            "reason_created": run["reasonCreated"],
            "reason_resolved": run["reasonResolved"],
            "resolved": run["resolved"],
            "run_id": data["runId"],
            "scheduled": run["scheduled"],
            "state": run["state"],
        }
        if "started" in run:
            run_record["started"] = run["started"]

        if "workerGroup" in run:
            run_record["worker_group"] = run["workerGroup"]

        if "workerId" in run:
            run_record["worker_id"] = run["workerId"]

        self.run_records.append(
            Runs.from_dict(run_record)
        )

        if data["runId"] == 0:
            # Only insert the task record for run 0 to avoid duplicate records.
            try:
                task_record = {
                    "scheduler_id": status["schedulerId"],
                    "tags": {},
                    "task_group_id": status["taskGroupId"],
                    "task_id": status["taskId"],
                    "task_queue_id": status["taskQueueId"],
                }
                # Tags can be missing if the run is in the exception state.
                if tags := data.get("task", {}).get("tags"):
                    for key, value in tags.items():
                        if key := self._normalize_tag(key):
                            task_record["tags"][key] = value

                self.task_records.append(
                    Tasks.from_dict(task_record)
                )
            except Exception:
                # Don't insert the run without its corresponding task.
                self.run_records.pop()
                raise