def get_bigquery_etl_table_references()

in sync/bigquery_etl.py [0:0]


def get_bigquery_etl_table_references() -> Dict:
    table_references = collections.defaultdict(dict)
    # dictionary shape: {qualified_name: {bigquery_etl_url: ..., wtmo_url: ...}}

    logger.info("Fetching metadata from GitHub...")

    repo_content = BytesIO(requests.get(REPO_ARCHIVE).content)
    with tarfile.open(fileobj=repo_content, mode="r|gz") as tar:
        for member in tar:
            match = VALID_TABLE_RE.match(member.name)
            if match is None:
                continue

            project, dataset, table, filename = match.groups()
            qualified_name = f"{project}.{dataset}.{table}"

            if filename == METADATA_FILE:
                tar.extract(member)
                metadata = yaml.safe_load(Path(member.name).read_text())
                if "scheduling" in metadata and "dag_name" in metadata["scheduling"]:
                    wtmo_url = f"{WTMO_URL}/{metadata['scheduling']['dag_name']}/grid"
                    table_references[qualified_name]["wtmo_url"] = wtmo_url

            elif filename in QUERY_FILES:
                bigquery_etl_url = f"{REPO_URL}/{project}/{dataset}/{table}/{filename}"
                table_references[qualified_name]["bigquery_etl_url"] = bigquery_etl_url

    return table_references