in src/package/dataplexutils/metadata/wizard.py [0:0]
def _get_job_sources(self, use_enabled, table_fqn):
"""Add stringdocs
Args:
Add stringdocs
Raises:
Add stringdocs
"""
try:
if use_enabled:
bq_process_sql = []
lineage_client = self._cloud_clients[
constants["CLIENTS"]["DATA_CATALOG_LINEAGE"]
]
target = datacatalog_lineage_v1.EntityReference()
target.fully_qualified_name = f"bigquery:{table_fqn}"
dataset_location = self._get_dataset_location(table_fqn)
logger.info(f"Searching for lineage links for table {table_fqn}.")
request = datacatalog_lineage_v1.SearchLinksRequest(
parent=f"projects/{self._project_id}/locations/{dataset_location}",
target=target,
)
try:
link_results = lineage_client.search_links(request=request)
except Exception as e:
logger.error(f"Cannot find lineage links for table {table_fqn}:exception:{e}.")
return []
raise e
if len(link_results.links) > 0:
links = [link.name for link in link_results]
lineage_processes_ids = [
process.process
for process in lineage_client.batch_search_link_processes(
request=datacatalog_lineage_v1.BatchSearchLinkProcessesRequest(
parent=f"projects/{self._project_id}/locations/{dataset_location}",
links=links,
)
)
]
for process_id in lineage_processes_ids:
process_details = lineage_client.get_process(
request=datacatalog_lineage_v1.GetProcessRequest(
name=process_id,
)
)
if "bigquery_job_id" in process_details.attributes:
bq_process_sql.append(
self._bq_job_info(
process_details.attributes["bigquery_job_id"],
dataset_location,
)
)
if not bq_process_sql:
self._client_options._use_lineage_processes = False
return bq_process_sql
else:
self._client_options._use_lineage_processes = False
return []
else:
return []
except Exception as e:
logger.error(f"Exception: {e}.")
return []
raise e