in sync/datahub/legacy_source.py [0:0]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for legacy_ping in get_legacy_pings():
legacy_qualified_urn = builder.make_dataset_urn(
platform=self.platform,
name=legacy_ping.name,
env=self.config.env,
)
legacy_ping_aspects = [
SubTypesClass(typeNames=["Ping"]),
BrowsePathsClass(
paths=[f"/{self.config.env.lower()}/legacy/{legacy_ping.name}"]
),
]
legacy_ping_mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=legacy_qualified_urn, aspects=legacy_ping_aspects
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=legacy_qualified_urn,
type="TRANSFORMED",
)
]
)
upstream_lineage_mcps = [
MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn(
platform="bigquery",
name=qualified_table_name,
env=self.config.env,
),
aspectName="upstreamLineage",
aspect=upstream_lineage,
)
for qualified_table_name in legacy_ping.bigquery_fully_qualified_names
]
for mcp in legacy_ping_mcps + upstream_lineage_mcps:
wu = mcp.as_workunit()
yield wu