sync/datahub/bigquery_etl_source.py (60 lines of code) (raw):
from typing import Iterable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.emitter.mcp import MetadataChangeProposalWrapper, ChangeTypeClass
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import (
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
)
from datahub.configuration.common import ConfigModel
from sync.bigquery_etl import get_bigquery_etl_table_references
from sync.datahub.utils import get_current_timestamp
class BigQueryEtlSourceConfig(ConfigModel):
env: str = "PROD"
class BigQueryEtlSource(Source):
def __init__(self, config: BigQueryEtlSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
self.report = SourceReport()
self.platform = "bigquery"
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryEtlSource":
config = BigQueryEtlSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for qualified_table_name, urls in get_bigquery_etl_table_references().items():
bigquery_qualified_urn = builder.make_dataset_urn(
platform=self.platform,
name=qualified_table_name,
env=self.config.env,
)
link_elements = []
if "wtmo_url" in urls:
link_elements.append(
InstitutionalMemoryMetadataClass(
url=urls["wtmo_url"],
description="Airflow DAG",
createStamp=get_current_timestamp(),
)
)
if "bigquery_etl_url" in urls:
link_elements.append(
InstitutionalMemoryMetadataClass(
url=urls["bigquery_etl_url"],
description="BigQuery-ETL Source Code",
createStamp=get_current_timestamp(),
)
)
if not link_elements:
continue
mcp = MetadataChangeProposalWrapper(
changeType=ChangeTypeClass.UPSERT, # Should be UPDATE but it isn't supported
entityUrn=bigquery_qualified_urn,
aspect=InstitutionalMemoryClass(elements=link_elements),
)
wu = mcp.as_workunit()
yield wu
def get_report(self) -> SourceReport:
return self.report