in sync/datahub/bigquery_etl_source.py [0:0]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for qualified_table_name, urls in get_bigquery_etl_table_references().items():
bigquery_qualified_urn = builder.make_dataset_urn(
platform=self.platform,
name=qualified_table_name,
env=self.config.env,
)
link_elements = []
if "wtmo_url" in urls:
link_elements.append(
InstitutionalMemoryMetadataClass(
url=urls["wtmo_url"],
description="Airflow DAG",
createStamp=get_current_timestamp(),
)
)
if "bigquery_etl_url" in urls:
link_elements.append(
InstitutionalMemoryMetadataClass(
url=urls["bigquery_etl_url"],
description="BigQuery-ETL Source Code",
createStamp=get_current_timestamp(),
)
)
if not link_elements:
continue
mcp = MetadataChangeProposalWrapper(
changeType=ChangeTypeClass.UPSERT, # Should be UPDATE but it isn't supported
entityUrn=bigquery_qualified_urn,
aspect=InstitutionalMemoryClass(elements=link_elements),
)
wu = mcp.as_workunit()
yield wu