jobs/fxci-taskcluster-export/fxci_etl/config.py (87 lines of code) (raw):
import os
import tomllib
from dataclasses import dataclass
from pathlib import Path
from typing import Any, ClassVar, Optional
import dacite
@dataclass(frozen=True)
class PulseExchangeConfig:
exchange: str
routing_key: str
@dataclass(frozen=True)
class PulseConfig:
user: str
password: str
host: str = "pulse.mozilla.org"
port: int = 5671
durable: bool = True
queues: ClassVar[dict[str, PulseExchangeConfig]] = {
"task-completed": PulseExchangeConfig(
exchange="exchange/taskcluster-queue/v1/task-completed",
routing_key="#",
),
"task-failed": PulseExchangeConfig(
exchange="exchange/taskcluster-queue/v1/task-failed",
routing_key="#",
),
"task-exception": PulseExchangeConfig(
exchange="exchange/taskcluster-queue/v1/task-exception",
routing_key="#",
),
}
@dataclass(frozen=True)
class BigQueryTableConfig:
metrics: str = "worker_metrics_v1"
tasks: str = "tasks_v2"
runs: str = "task_runs_v1"
@dataclass(frozen=True)
class BigQueryConfig:
project: str
dataset: str
tables: BigQueryTableConfig = BigQueryTableConfig()
credentials: Optional[str] = None
@dataclass(frozen=True)
class MonitoringConfig:
credentials: Optional[str] = None
projects: ClassVar[list] = [
"fxci-production-level1-workers",
"fxci-production-level3-workers",
]
@dataclass(frozen=True)
class StorageConfig:
project: str
bucket: str
credentials: Optional[str] = None
@dataclass(frozen=True)
class Config:
bigquery: BigQueryConfig
storage: StorageConfig
# Depending on the commands being run, the pulse or monitoring
# configs may not be necessary.
pulse: Optional[PulseConfig]
monitoring: MonitoringConfig = MonitoringConfig()
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Config":
return dacite.from_dict(data_class=cls, data=data)
@classmethod
def from_file(cls, path: str | Path) -> "Config":
if isinstance(path, str):
path = Path(path)
with path.open("rb") as fh:
return cls.from_dict(tomllib.load(fh))
@classmethod
def from_env(cls) -> "Config":
envs = {
k[len("FXCI_ETL_") :]: v
for k, v in os.environ.items()
if k.startswith("FXCI_ETL_")
}
# Map environment variables to a dict
config_dict = {}
for key, value in envs.items():
parts = key.split("_")
obj = config_dict
part = parts.pop(0).lower()
while parts:
obj.setdefault(part, {})
obj = obj[part]
part = parts.pop(0).lower()
obj[part] = value
return cls.from_dict(config_dict)