def get_workunits_internal()

in sync/datahub/bigquery_etl_source.py [0:0]


    def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
        for qualified_table_name, urls in get_bigquery_etl_table_references().items():

            bigquery_qualified_urn = builder.make_dataset_urn(
                platform=self.platform,
                name=qualified_table_name,
                env=self.config.env,
            )

            link_elements = []
            if "wtmo_url" in urls:
                link_elements.append(
                    InstitutionalMemoryMetadataClass(
                        url=urls["wtmo_url"],
                        description="Airflow DAG",
                        createStamp=get_current_timestamp(),
                    )
                )

            if "bigquery_etl_url" in urls:
                link_elements.append(
                    InstitutionalMemoryMetadataClass(
                        url=urls["bigquery_etl_url"],
                        description="BigQuery-ETL Source Code",
                        createStamp=get_current_timestamp(),
                    )
                )

            if not link_elements:
                continue

            mcp = MetadataChangeProposalWrapper(
                changeType=ChangeTypeClass.UPSERT,  # Should be UPDATE but it isn't supported
                entityUrn=bigquery_qualified_urn,
                aspect=InstitutionalMemoryClass(elements=link_elements),
            )

            wu = mcp.as_workunit()
            yield wu