def _get_job_sources()

in src/package/dataplexutils/metadata/wizard.py [0:0]


    def _get_job_sources(self, use_enabled, table_fqn):
        """Add stringdocs

        Args:
            Add stringdocs

        Raises:
            Add stringdocs
        """
        try:
            if use_enabled:
                bq_process_sql = []
                lineage_client = self._cloud_clients[
                    constants["CLIENTS"]["DATA_CATALOG_LINEAGE"]
                ]
                target = datacatalog_lineage_v1.EntityReference()
                target.fully_qualified_name = f"bigquery:{table_fqn}"
                dataset_location = self._get_dataset_location(table_fqn)
                logger.info(f"Searching for lineage links for table {table_fqn}.")
                request = datacatalog_lineage_v1.SearchLinksRequest(
                    parent=f"projects/{self._project_id}/locations/{dataset_location}",
                    target=target,
                )
                try:
                    link_results = lineage_client.search_links(request=request)
                except Exception as e:
                    logger.error(f"Cannot find lineage links for table {table_fqn}:exception:{e}.")
                    return []
                    raise e
                
                if len(link_results.links) > 0:
                    links = [link.name for link in link_results]
                    lineage_processes_ids = [
                        process.process
                        for process in lineage_client.batch_search_link_processes(
                            request=datacatalog_lineage_v1.BatchSearchLinkProcessesRequest(
                                parent=f"projects/{self._project_id}/locations/{dataset_location}",
                                links=links,
                            )
                        )
                    ]
                    for process_id in lineage_processes_ids:
                        process_details = lineage_client.get_process(
                            request=datacatalog_lineage_v1.GetProcessRequest(
                                name=process_id,
                            )
                        )
                        if "bigquery_job_id" in process_details.attributes:
                            bq_process_sql.append(
                                self._bq_job_info(
                                    process_details.attributes["bigquery_job_id"],
                                    dataset_location,
                                )
                            )
                    if not bq_process_sql:
                        self._client_options._use_lineage_processes = False
                    return bq_process_sql
                else:
                    self._client_options._use_lineage_processes = False
                    return []
            else:
                return []
        except Exception as e:
            logger.error(f"Exception: {e}.")
            return []
            raise e