in sync/bigquery_etl.py [0:0]
def get_bigquery_etl_table_references() -> Dict:
table_references = collections.defaultdict(dict)
# dictionary shape: {qualified_name: {bigquery_etl_url: ..., wtmo_url: ...}}
logger.info("Fetching metadata from GitHub...")
repo_content = BytesIO(requests.get(REPO_ARCHIVE).content)
with tarfile.open(fileobj=repo_content, mode="r|gz") as tar:
for member in tar:
match = VALID_TABLE_RE.match(member.name)
if match is None:
continue
project, dataset, table, filename = match.groups()
qualified_name = f"{project}.{dataset}.{table}"
if filename == METADATA_FILE:
tar.extract(member)
metadata = yaml.safe_load(Path(member.name).read_text())
if "scheduling" in metadata and "dag_name" in metadata["scheduling"]:
wtmo_url = f"{WTMO_URL}/{metadata['scheduling']['dag_name']}/grid"
table_references[qualified_name]["wtmo_url"] = wtmo_url
elif filename in QUERY_FILES:
bigquery_etl_url = f"{REPO_URL}/{project}/{dataset}/{table}/{filename}"
table_references[qualified_name]["bigquery_etl_url"] = bigquery_etl_url
return table_references