jobs/fxci-taskcluster-export/fxci_etl/schemas.py (104 lines of code) (raw):

from abc import ABC, abstractmethod from datetime import datetime, timezone from dataclasses import fields, is_dataclass, dataclass from typing import Any, Optional, Type from typing import TypeAlias, Union, get_args, get_origin import dacite from aenum import Enum, NoAlias from dacite.config import Config as DaciteConfig from google.cloud.bigquery import SchemaField class BigQueryTypes(Enum, settings=NoAlias): # type: ignore DATE: TypeAlias = str FLOAT: TypeAlias = float INTEGER: TypeAlias = int STRING: TypeAlias = str TIMESTAMP: TypeAlias = int def generate_schema(cls): assert is_dataclass(cls) schema = [] for field in fields(cls): _type = field.type origin = get_origin(_type) args = get_args(_type) if origin is Union and type(None) in args: mode = "NULLABLE" _type = [arg for arg in args if arg is not type(None)][0] origin = get_origin(_type) args = get_args(_type) else: mode = "REQUIRED" if origin is list: mode = "REPEATED" _type = args[0] if is_dataclass(_type): nested_schema = generate_schema(_type) schema.append( SchemaField(field.name, "RECORD", mode=mode, fields=nested_schema) ) else: schema.append(SchemaField(field.name, _type.name, mode=mode)) return schema @dataclass class Record(ABC): submission_date: BigQueryTypes.DATE @classmethod def from_dict(cls, data: dict[str, Any]) -> "Record": current_date = datetime.now(timezone.utc).date() data["submission_date"] = current_date.strftime("%Y-%m-%d") return dacite.from_dict( data_class=cls, data=data, config=DaciteConfig(check_types=False) ) @abstractmethod def __str__(self) -> str: ... @dataclass class Runs(Record): reason_created: BigQueryTypes.STRING reason_resolved: BigQueryTypes.STRING resolved: BigQueryTypes.TIMESTAMP run_id: BigQueryTypes.INTEGER scheduled: BigQueryTypes.TIMESTAMP started: Optional[BigQueryTypes.TIMESTAMP] state: BigQueryTypes.STRING task_id: BigQueryTypes.STRING worker_group: Optional[BigQueryTypes.STRING] worker_id: Optional[BigQueryTypes.STRING] def __str__(self): return f"{self.task_id} run {self.run_id}" @dataclass class Tags: created_for_user: Optional[BigQueryTypes.STRING] kind: Optional[BigQueryTypes.STRING] label: Optional[BigQueryTypes.STRING] os: Optional[BigQueryTypes.STRING] owned_by: Optional[BigQueryTypes.STRING] project: Optional[BigQueryTypes.STRING] trust_domain: Optional[BigQueryTypes.STRING] worker_implementation: Optional[BigQueryTypes.STRING] test_suite: Optional[BigQueryTypes.STRING] test_platform: Optional[BigQueryTypes.STRING] test_variant: Optional[BigQueryTypes.STRING] @dataclass class Tasks(Record): scheduler_id: BigQueryTypes.STRING task_group_id: BigQueryTypes.STRING task_id: BigQueryTypes.STRING task_queue_id: BigQueryTypes.STRING tags: Tags def __str__(self): return self.task_id @dataclass class Metrics(Record): instance_id: BigQueryTypes.STRING project: BigQueryTypes.STRING zone: BigQueryTypes.STRING uptime: BigQueryTypes.FLOAT interval_start_time: BigQueryTypes.TIMESTAMP interval_end_time: BigQueryTypes.TIMESTAMP def __str__(self): return f"worker {self.instance_id}" def get_record_cls(table_type: str) -> Type[Record]: """Return the record class corresponding to the given table type. Args: table_type (str): Table for which to return a record class. Returns: Type[Record]: The record class for the corresponding table. """ assert table_type in ("tasks", "runs", "metrics") for name, obj in globals().items(): if name.lower() == table_type and issubclass(obj, Record): return obj raise Exception(f"Record not found for {table_type}!")