jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py (132 lines of code) (raw):

import base64 import json import re import traceback from abc import ABC, abstractmethod from dataclasses import dataclass from pprint import pprint from typing import Any, Optional import dacite from google.cloud import storage from google.cloud.exceptions import NotFound from kombu import Message from loguru import logger from fxci_etl.config import Config from fxci_etl.loaders.bigquery import BigQueryLoader from fxci_etl.schemas import Record, Runs, Tasks, Tags @dataclass class Event: data: dict[str, Any] message: Optional[Message] @classmethod def from_dict(cls, data: dict[str, Any]) -> "Event": return dacite.from_dict(data_class=cls, data=data) def to_dict(self): return {"data": self.data} class PulseHandler(ABC): name = "" def __init__(self, config: Config): self.config = config if config.storage.credentials: storage_client = storage.Client.from_service_account_info( json.loads(base64.b64decode(config.storage.credentials).decode("utf8")) ) else: storage_client = storage.Client() bucket = storage_client.bucket(config.storage.bucket) self._event_backup = bucket.blob(f"failed-pulse-events-{self.name}.json") self._buffer: list[Event] = [] self._count = 0 def __call__(self, data: dict[str, Any], message: Message) -> None: self._count += 1 message.ack() event = Event(data, message) self._buffer.append(event) def process_buffer(self): try: # Load previously failed events from storage, maybe the issue is fixed. for obj in json.loads(self._event_backup.download_as_string()): self._buffer.append(Event.from_dict(obj)) except NotFound: pass failed = [] for event in self._buffer: try: self.process_event(event) except Exception: logger.error(f"Error processing event in {self.name} handler:") pprint(event, indent=2) traceback.print_exc() failed.append(event.to_dict()) # Save any failed events back to storage. self._event_backup.upload_from_string(json.dumps(failed)) self._buffer = [] self.on_processing_complete() @abstractmethod def process_event(self, event: Event) -> None: ... def on_processing_complete(self) -> None: pass class BigQueryHandler(PulseHandler): name = "bigquery" def __init__(self, config: Config, **kwargs: Any): super().__init__(config, **kwargs) self.task_records: list[Record] = [] self.run_records: list[Record] = [] self._convert_camel_case_re = re.compile(r"(?<!^)(?=[A-Z])") self._known_tags = set(Tags.__annotations__.keys()) def _normalize_tag(self, tag: str) -> str | None: """Tags are not well standardized and can be in camel case, snake case, separated by dashes or even spaces. Ensure they all get normalized to snake case. If the normalization results in a known tag, return it. Otherwise return None. """ tag = tag.replace("-", "_").replace(" ", "_") tag = self._convert_camel_case_re.sub("_", tag).lower() if tag in self._known_tags: return tag def process_event(self, event): data = event.data if data.get("runId") is None: # This can happen if `deadline` was exceeded before a run could # start. Ignore this case. return status = data["status"] run = data["status"]["runs"][data["runId"]] run_record = { "task_id": status["taskId"], "reason_created": run["reasonCreated"], "reason_resolved": run["reasonResolved"], "resolved": run["resolved"], "run_id": data["runId"], "scheduled": run["scheduled"], "state": run["state"], } if "started" in run: run_record["started"] = run["started"] if "workerGroup" in run: run_record["worker_group"] = run["workerGroup"] if "workerId" in run: run_record["worker_id"] = run["workerId"] self.run_records.append( Runs.from_dict(run_record) ) if data["runId"] == 0: # Only insert the task record for run 0 to avoid duplicate records. try: task_record = { "scheduler_id": status["schedulerId"], "tags": {}, "task_group_id": status["taskGroupId"], "task_id": status["taskId"], "task_queue_id": status["taskQueueId"], } # Tags can be missing if the run is in the exception state. if tags := data.get("task", {}).get("tags"): for key, value in tags.items(): if key := self._normalize_tag(key): task_record["tags"][key] = value self.task_records.append( Tasks.from_dict(task_record) ) except Exception: # Don't insert the run without its corresponding task. self.run_records.pop() raise def on_processing_complete(self): logger.info(f"Processed {self._count} pulse events") if self.task_records: task_loader = BigQueryLoader(self.config, "tasks") task_loader.insert(self.task_records) self.task_records = [] if self.run_records: run_loader = BigQueryLoader(self.config, "runs") run_loader.insert(self.run_records) self.run_records = []