sync/datahub/glean_source.py (102 lines of code) (raw):
from typing import Iterable, Optional, List
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.emitter.mcp import MetadataChangeProposalWrapper, ChangeTypeClass
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import (
BrowsePathsClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
DatasetPropertiesClass,
SubTypesClass,
UpstreamLineageClass,
UpstreamClass,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
StaleEntityRemovalSourceReport,
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from sync.datahub.utils import get_current_timestamp
from sync.glean import get_glean_pings
class GleanSourceConfig(StatefulIngestionConfigBase):
env: str = "PROD"
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
class GleanSource(StatefulIngestionSourceBase):
def __init__(self, config: GleanSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config
self.platform = "Glean"
def get_platform_instance_id(self) -> str:
return f"{self.platform}"
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext):
config = GleanSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for glean_ping in get_glean_pings():
glean_qualified_urn = builder.make_dataset_urn(
platform=self.platform,
name=glean_ping.qualified_name,
env=self.config.env,
)
glean_ping_aspects = [
InstitutionalMemoryClass(
elements=[
InstitutionalMemoryMetadataClass(
url=glean_ping.glean_dictionary_url,
description="Glean Dictionary Ping Documentation",
createStamp=get_current_timestamp(),
)
],
),
DatasetPropertiesClass(
name=glean_ping.name, description=glean_ping.description
),
SubTypesClass(typeNames=["Ping"]),
BrowsePathsClass(
paths=[f"/{self.config.env.lower()}/glean/{glean_ping.app_name}"]
),
]
glean_ping_mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=glean_qualified_urn, aspects=glean_ping_aspects
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=glean_qualified_urn,
type="TRANSFORMED",
)
]
)
upstream_lineage_mcps = [
MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn(
platform="bigquery",
name=qualified_table_name,
env=self.config.env,
),
aspectName="upstreamLineage",
aspect=upstream_lineage,
)
for qualified_table_name in glean_ping.bigquery_fully_qualified_names
]
for mcp in glean_ping_mcps + upstream_lineage_mcps:
wu = mcp.as_workunit()
yield wu
def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report