def get_openlineage_facets_on_complete()

in providers/google/src/airflow/providers/google/cloud/operators/bigquery_dts.py [0:0]


    def get_openlineage_facets_on_complete(self, _):
        """Implement _on_complete as we need a run config to extract information."""
        from urllib.parse import urlsplit

        from airflow.providers.common.compat.openlineage.facet import Dataset, ErrorMessageRunFacet
        from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
        from airflow.providers.google.cloud.openlineage.utils import (
            BIGQUERY_NAMESPACE,
            extract_ds_name_from_gcs_path,
        )
        from airflow.providers.openlineage.extractors import OperatorLineage
        from airflow.providers.openlineage.sqlparser import DatabaseInfo, SQLParser

        if not self._transfer_run:
            self.log.debug("No BigQuery Data Transfer configuration was found by OpenLineage.")
            return OperatorLineage()

        data_source_id = self._transfer_run["data_source_id"]
        dest_dataset_id = self._transfer_run["destination_dataset_id"]
        params = self._transfer_run["params"]

        input_datasets, output_datasets = [], []
        run_facets, job_facets = {}, {}
        if data_source_id in ("google_cloud_storage", "amazon_s3", "azure_blob_storage"):
            if data_source_id == "google_cloud_storage":
                bucket, path = _parse_gcs_url(params["data_path_template"])  # gs://bucket...
                namespace = f"gs://{bucket}"
                name = extract_ds_name_from_gcs_path(path)
            elif data_source_id == "amazon_s3":
                parsed_url = urlsplit(params["data_path"])  # s3://bucket...
                namespace = f"s3://{parsed_url.netloc}"
                name = extract_ds_name_from_gcs_path(parsed_url.path)
            else:  # azure_blob_storage
                storage_account = params["storage_account"]
                container = params["container"]
                namespace = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
                name = extract_ds_name_from_gcs_path(params["data_path"])

            input_datasets.append(Dataset(namespace=namespace, name=name))
            dest_table_name = params["destination_table_name_template"]
            output_datasets.append(
                Dataset(
                    namespace=BIGQUERY_NAMESPACE,
                    name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
                )
            )
        elif data_source_id in ("postgresql", "oracle", "mysql"):
            scheme = data_source_id if data_source_id != "postgresql" else "postgres"
            host = params["connector.endpoint.host"]
            port = params["connector.endpoint.port"]

            for asset in params["assets"]:
                # MySQL: db/table; Other: db/schema/table;
                table_name = asset.split("/")[-1]

                input_datasets.append(
                    Dataset(namespace=f"{scheme}://{host}:{int(port)}", name=asset.replace("/", "."))
                )
                output_datasets.append(
                    Dataset(
                        namespace=BIGQUERY_NAMESPACE, name=f"{self.project_id}.{dest_dataset_id}.{table_name}"
                    )
                )
        elif data_source_id == "scheduled_query":
            bq_db_info = DatabaseInfo(
                scheme="bigquery",
                authority=None,
                database=self.project_id,
            )
            parser_result = SQLParser("bigquery").generate_openlineage_metadata_from_sql(
                sql=params["query"],
                database_info=bq_db_info,
                database=self.project_id,
                use_connection=False,
                hook=None,  # Hook is not used when use_connection=False
                sqlalchemy_engine=None,
            )
            if parser_result.inputs:
                input_datasets.extend(parser_result.inputs)
            if parser_result.outputs:
                output_datasets.extend(parser_result.outputs)
            if parser_result.job_facets:
                job_facets = {**job_facets, **parser_result.job_facets}
            if parser_result.run_facets:
                run_facets = {**run_facets, **parser_result.run_facets}
            dest_table_name = params.get("destination_table_name_template")
            if dest_table_name:
                output_datasets.append(
                    Dataset(
                        namespace=BIGQUERY_NAMESPACE,
                        name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
                    )
                )
        else:
            self.log.debug(
                "BigQuery Data Transfer data_source_id `%s` is not supported by OpenLineage.", data_source_id
            )
            return OperatorLineage()

        error_status = self._transfer_run.get("error_status")
        if error_status and str(error_status["code"]) != "0":
            run_facets["errorMessage"] = ErrorMessageRunFacet(
                message=error_status["message"],
                programmingLanguage="python",
                stackTrace=str(error_status["details"]),
            )

        return OperatorLineage(
            inputs=input_datasets, outputs=output_datasets, job_facets=job_facets, run_facets=run_facets
        )