in providers/google/src/airflow/providers/google/cloud/operators/bigquery_dts.py [0:0]
def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we need a run config to extract information."""
from urllib.parse import urlsplit
from airflow.providers.common.compat.openlineage.facet import Dataset, ErrorMessageRunFacet
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
extract_ds_name_from_gcs_path,
)
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import DatabaseInfo, SQLParser
if not self._transfer_run:
self.log.debug("No BigQuery Data Transfer configuration was found by OpenLineage.")
return OperatorLineage()
data_source_id = self._transfer_run["data_source_id"]
dest_dataset_id = self._transfer_run["destination_dataset_id"]
params = self._transfer_run["params"]
input_datasets, output_datasets = [], []
run_facets, job_facets = {}, {}
if data_source_id in ("google_cloud_storage", "amazon_s3", "azure_blob_storage"):
if data_source_id == "google_cloud_storage":
bucket, path = _parse_gcs_url(params["data_path_template"]) # gs://bucket...
namespace = f"gs://{bucket}"
name = extract_ds_name_from_gcs_path(path)
elif data_source_id == "amazon_s3":
parsed_url = urlsplit(params["data_path"]) # s3://bucket...
namespace = f"s3://{parsed_url.netloc}"
name = extract_ds_name_from_gcs_path(parsed_url.path)
else: # azure_blob_storage
storage_account = params["storage_account"]
container = params["container"]
namespace = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
name = extract_ds_name_from_gcs_path(params["data_path"])
input_datasets.append(Dataset(namespace=namespace, name=name))
dest_table_name = params["destination_table_name_template"]
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
)
)
elif data_source_id in ("postgresql", "oracle", "mysql"):
scheme = data_source_id if data_source_id != "postgresql" else "postgres"
host = params["connector.endpoint.host"]
port = params["connector.endpoint.port"]
for asset in params["assets"]:
# MySQL: db/table; Other: db/schema/table;
table_name = asset.split("/")[-1]
input_datasets.append(
Dataset(namespace=f"{scheme}://{host}:{int(port)}", name=asset.replace("/", "."))
)
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE, name=f"{self.project_id}.{dest_dataset_id}.{table_name}"
)
)
elif data_source_id == "scheduled_query":
bq_db_info = DatabaseInfo(
scheme="bigquery",
authority=None,
database=self.project_id,
)
parser_result = SQLParser("bigquery").generate_openlineage_metadata_from_sql(
sql=params["query"],
database_info=bq_db_info,
database=self.project_id,
use_connection=False,
hook=None, # Hook is not used when use_connection=False
sqlalchemy_engine=None,
)
if parser_result.inputs:
input_datasets.extend(parser_result.inputs)
if parser_result.outputs:
output_datasets.extend(parser_result.outputs)
if parser_result.job_facets:
job_facets = {**job_facets, **parser_result.job_facets}
if parser_result.run_facets:
run_facets = {**run_facets, **parser_result.run_facets}
dest_table_name = params.get("destination_table_name_template")
if dest_table_name:
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
)
)
else:
self.log.debug(
"BigQuery Data Transfer data_source_id `%s` is not supported by OpenLineage.", data_source_id
)
return OperatorLineage()
error_status = self._transfer_run.get("error_status")
if error_status and str(error_status["code"]) != "0":
run_facets["errorMessage"] = ErrorMessageRunFacet(
message=error_status["message"],
programmingLanguage="python",
stackTrace=str(error_status["details"]),
)
return OperatorLineage(
inputs=input_datasets, outputs=output_datasets, job_facets=job_facets, run_facets=run_facets
)