sync/bigquery_etl.py (48 lines of code) (raw):
import collections
import re
from pathlib import Path
from typing import Dict
import logging
import requests
from io import BytesIO
import tarfile
import yaml
FILE_PREFIX = "bigquery-etl-generated-sql/sql"
QUERY_FILES = {
"query.sql",
"view.sql",
"query.py",
}
METADATA_FILE = "metadata.yaml"
SUPPORTED_PROJECTS = {
"moz-fx-data-shared-prod",
"moz-fx-data-marketing-prod",
}
VALID_TABLE_RE = re.compile(
rf"{FILE_PREFIX}\/({'|'.join(SUPPORTED_PROJECTS)})\/([a-z0-9_-]+)\/([a-z0-9_-]+)\/({'|'.join(QUERY_FILES)}|{METADATA_FILE})" # noqa
)
REPO_ARCHIVE = "https://github.com/mozilla/bigquery_etl/archive/generated-sql.tar.gz"
REPO_URL = "https://github.com/mozilla/bigquery-etl/blob/generated-sql/sql"
WTMO_URL = "https://workflow.telemetry.mozilla.org/dags"
logger = logging.getLogger(__name__)
def get_bigquery_etl_table_references() -> Dict:
table_references = collections.defaultdict(dict)
# dictionary shape: {qualified_name: {bigquery_etl_url: ..., wtmo_url: ...}}
logger.info("Fetching metadata from GitHub...")
repo_content = BytesIO(requests.get(REPO_ARCHIVE).content)
with tarfile.open(fileobj=repo_content, mode="r|gz") as tar:
for member in tar:
match = VALID_TABLE_RE.match(member.name)
if match is None:
continue
project, dataset, table, filename = match.groups()
qualified_name = f"{project}.{dataset}.{table}"
if filename == METADATA_FILE:
tar.extract(member)
metadata = yaml.safe_load(Path(member.name).read_text())
if "scheduling" in metadata and "dag_name" in metadata["scheduling"]:
wtmo_url = f"{WTMO_URL}/{metadata['scheduling']['dag_name']}/grid"
table_references[qualified_name]["wtmo_url"] = wtmo_url
elif filename in QUERY_FILES:
bigquery_etl_url = f"{REPO_URL}/{project}/{dataset}/{table}/{filename}"
table_references[qualified_name]["bigquery_etl_url"] = bigquery_etl_url
return table_references