sync/legacy.py (39 lines of code) (raw):

from collections import defaultdict from dataclasses import dataclass from typing import Sequence, Dict import requests import tarfile from io import BytesIO SCHEMA_URL = "https://github.com/mozilla-services/mozilla-pipeline-schemas/archive/generated-schemas.tar.gz" # noqa: E501 @dataclass class LegacyPing: name: str versions: Sequence[int] @property def bigquery_fully_qualified_names(self) -> Sequence[str]: table_names = [] for version in self.versions: table_name = f"{self.name.replace('-', '_')}_v{version}" table_names.append(f"moz-fx-data-shared-prod.telemetry_live.{table_name}") return table_names def _get_ping_schemas() -> Dict[str, Sequence[str]]: """ Fetches the latest version of the schema tarball from GitHub and returns a dict of ping names and their schema versions. Example: { 'account-ecosystem': ['account-ecosystem.4.schema.json'], 'android-anr-report': ['android-anr-report.1.schema.json', 'android-anr-report.2.schema.json'], # noqa: E501 'anonymous': ['anonymous.4.schema.json'], } """ print("Fetching schemas from GitHub...") schemas = requests.get(SCHEMA_URL) schema_file = BytesIO(schemas.content) with tarfile.open(fileobj=schema_file, mode="r|gz") as tar: schema_versions = defaultdict(list) for member in tar: if member.name.startswith( "mozilla-pipeline-schemas-generated-schemas/schemas/telemetry/" ) and member.name.endswith(".schema.json"): *_, ping_name, schema_name = member.name.split("/") schema_versions[ping_name].append(schema_name) return schema_versions def get_legacy_pings() -> Sequence[LegacyPing]: pings = [] schema_versions = _get_ping_schemas() for key, value in schema_versions.items(): pings.append( LegacyPing(name=key, versions=[int(v.split(".")[1]) for v in value]) ) return pings