utils/gcp.py (373 lines of code) (raw):

import re from airflow import models from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.google.cloud.hooks.dataproc import DataProcJobBuilder from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitJobOperator, ) from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator from airflow.providers.google.cloud.transfers.bigquery_to_gcs import ( BigQueryToGCSOperator, ) from operators.gcp_container_operator import GKEPodOperator from utils.dataproc import get_dataproc_parameters GCP_PROJECT_ID = "moz-fx-data-airflow-gke-prod" DATAPROC_PROJECT_ID = "airflow-dataproc" BIGQUERY_ETL_DOCKER_IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest" def export_to_parquet( table, destination_table=None, static_partitions=None, arguments=None, use_storage_api=False, dag_name="export_to_parquet", parent_dag_name=None, default_args=None, gcp_conn_id="google_cloud_airflow_dataproc", dataproc_storage_bucket="airflow-dataproc-bq-parquet-exports", num_workers=2, num_preemptible_workers=0, gcs_output_bucket="airflow-dataproc-bq-parquet-exports", region="us-west1", ): """ Export a BigQuery table to Parquet. https://github.com/mozilla/bigquery-etl/blob/main/script/pyspark/export_to_parquet.py :param str table: [Required] BigQuery table name :param Optional[str] destination_table: Output table name, defaults to table, will have r'_v[0-9]+$' replaced with r'/v[0-9]+' :param List[str] arguments: Additional pyspark arguments :param bool use_storage_api: Whether to read from the BigQuery Storage API or an AVRO export :param str dag_name: Name of DAG :param Optional[str] parent_dag_name: Parent DAG name :param Optional[Dict[str, Any]] default_args: DAG configuration :param str gcp_conn_id: Airflow connection id for GCP access :param str dataproc_storage_bucket: Dataproc staging GCS bucket :param int num_preemptible_workers: Number of Dataproc preemptible workers :param str region: Region where the dataproc cluster will be located. Zone will be chosen automatically :return: airflow.models.DAG """ # remove the dataset prefix and partition suffix from table table_id = table.rsplit(".", 1)[-1] unqualified_table, _, partition_id = table_id.partition("$") # limit cluster name to 35 characters plus suffix of -export-YYYYMMDD (51 total) cluster_name = unqualified_table.replace("_", "-") if len(cluster_name) > 35: # preserve version when truncating cluster name to 42 characters prefix, version = re.match(r"(.*?)(-v[0-9]+)?$", cluster_name).groups("") cluster_name = prefix[: 35 - len(version)] + version cluster_name += "-export-{{ ds_nodash }}" dag_prefix = parent_dag_name + "." if parent_dag_name else "" project_id = DATAPROC_PROJECT_ID if destination_table is None: destination_table = unqualified_table # separate version using "/" instead of "_" export_prefix = re.sub(r"_(v[0-9]+)$", r"/\1", destination_table) + "/" if static_partitions: export_prefix += "/".join(static_partitions) + "/" avro_prefix = "avro/" + export_prefix if not static_partitions and partition_id: avro_prefix += "partition_id=" + partition_id + "/" avro_path = "gs://" + gcs_output_bucket + "/" + avro_prefix + "*.avro" params = get_dataproc_parameters("google_cloud_airflow_dataproc") if arguments is None: arguments = [] with models.DAG(dag_id=dag_prefix + dag_name, default_args=default_args) as dag: create_dataproc_cluster = DataprocCreateClusterOperator( task_id="create_dataproc_cluster", cluster_name=cluster_name, project_id=project_id, gcp_conn_id=gcp_conn_id, region=region, cluster_config=ClusterGenerator( project_id=project_id, service_account=params.client_email, num_workers=num_workers, storage_bucket=dataproc_storage_bucket, init_actions_uris=[ # This is probably a google hosted bucket for # https://github.com/GoogleCloudDataproc/initialization-actions/blob/master/python/pip-install.sh "gs://dataproc-initialization-actions/python/pip-install.sh", ], metadata={"PIP_PACKAGES": "google-cloud-bigquery==1.20.0"}, image_version="1.4-debian10", properties={}, master_machine_type="n1-standard-8", worker_machine_type="n1-standard-8", num_preemptible_workers=num_preemptible_workers, ).make(), ) dataproc_job_builder = DataProcJobBuilder( job_type="pyspark_job", task_id="run_dataproc_pyspark", cluster_name=cluster_name, project_id=project_id, properties={ "spark.jars.packages": "org.apache.spark:spark-avro_2.11:2.4.4", }, ) dataproc_job_builder.add_jar_file_uris( ["gs://spark-lib/bigquery/spark-2.4-bigquery-latest.jar"] ) dataproc_job_builder.set_python_main( "https://raw.githubusercontent.com/mozilla/bigquery-etl/main/script/legacy/export_to_parquet.py" ) dataproc_job_builder.add_args( [table] + [ "--" + key + "=" + value for key, value in { "avro-path": (not use_storage_api) and avro_path, "destination": "gs://" + gcs_output_bucket, "destination-table": destination_table, }.items() if value ] + (["--static-partitions"] if static_partitions else []) + (static_partitions if static_partitions else []) + arguments ) dataproc_job = dataproc_job_builder.build() run_dataproc_pyspark = DataprocSubmitJobOperator( task_id="run_dataproc_pyspark", job=dataproc_job["job"], gcp_conn_id=gcp_conn_id, project_id=project_id, region=region, ) delete_dataproc_cluster = DataprocDeleteClusterOperator( task_id="delete_dataproc_cluster", cluster_name=cluster_name, gcp_conn_id=gcp_conn_id, project_id=project_id, trigger_rule="all_done", region=region, ) if not use_storage_api: avro_export = BigQueryToGCSOperator( task_id="avro_export", source_project_dataset_table=table, destination_cloud_storage_uris=avro_path, compression=None, export_format="AVRO", gcp_conn_id=gcp_conn_id, ) avro_delete = GCSDeleteObjectsOperator( task_id="avro_delete", bucket_name=gcs_output_bucket, prefix=avro_prefix, gcp_conn_id=gcp_conn_id, trigger_rule="all_done", ) avro_export >> run_dataproc_pyspark >> avro_delete create_dataproc_cluster >> run_dataproc_pyspark >> delete_dataproc_cluster return dag def bigquery_etl_query( destination_table, dataset_id, parameters=(), arguments=(), project_id=None, sql_file_path=None, reattach_on_restart=True, gcp_conn_id="google_cloud_airflow_gke", gke_project_id=GCP_PROJECT_ID, gke_location="us-west1", gke_cluster_name="workloads-prod-v1", gke_namespace="default", docker_image=BIGQUERY_ETL_DOCKER_IMAGE, date_partition_parameter="submission_date", multipart=False, is_delete_operator_pod=False, table_partition_template="${{ds_nodash}}", **kwargs, ): """ Generate. :param str destination_table: [Required] BigQuery destination table :param str dataset_id: [Required] BigQuery default dataset id :param Tuple[str] parameters: Parameters passed to bq query :param Tuple[str] arguments: Additional bq query arguments :param Optional[str] project_id: BigQuery default project id :param Optional[str] sql_file_path: Optional override for path to the SQL query file to run :param str gcp_conn_id: Airflow connection id for GCP access :param str gke_project_id: GKE cluster project id :param str gke_location: GKE cluster location :param str gke_cluster_name: GKE cluster name :param str gke_namespace: GKE cluster namespace :param str docker_image: docker image to use :param Optional[str] date_partition_parameter: Parameter for indicating destination partition to generate, if None destination should be whole table rather than partition :param Dict[str, Any] kwargs: Additional keyword arguments for GKEPodOperator :param is_delete_operator_pod Optional, What to do when the pod reaches its final state, or the execution is interrupted. If False (default): do nothing, If True: delete the pod :param str table_partition_template: Template for the datestring that's used for the partition table id. Defaults to "{{ds_nodash}}". :return: GKEPodOperator """ kwargs["task_id"] = kwargs.get("task_id", destination_table) kwargs["name"] = kwargs.get("name", kwargs["task_id"].replace("_", "-")) if not project_id: project_id = "moz-fx-data-shared-prod" destination_table_no_partition = ( destination_table.split("$")[0] if destination_table is not None else None ) sql_file_path = ( sql_file_path or f"sql/{project_id}/{dataset_id}/{destination_table_no_partition}/query.sql" ) if destination_table is not None and date_partition_parameter is not None: destination_table = destination_table + table_partition_template parameters += (date_partition_parameter + ":DATE:{{ds}}",) args = ["script/bqetl", "query", "run-multipart"] if multipart else ["query"] return GKEPodOperator( reattach_on_restart=reattach_on_restart, gcp_conn_id=gcp_conn_id, project_id=gke_project_id, location=gke_location, cluster_name=gke_cluster_name, namespace=gke_namespace, image=docker_image, arguments=args + (["--destination_table=" + destination_table] if destination_table else []) + ["--dataset_id=" + dataset_id] + (["--project_id=" + project_id] if project_id else []) + ["--parameter=" + parameter for parameter in parameters] + list(arguments) + [sql_file_path], is_delete_operator_pod=is_delete_operator_pod, **kwargs, ) def bigquery_etl_copy_deduplicate( task_id, target_project_id, billing_projects=(), only_tables=None, except_tables=None, parallelism=4, priority="INTERACTIVE", hourly=False, slices=None, gcp_conn_id="google_cloud_airflow_gke", gke_project_id=GCP_PROJECT_ID, gke_location="us-west1", gke_cluster_name="workloads-prod-v1", gke_namespace="default", docker_image=BIGQUERY_ETL_DOCKER_IMAGE, **kwargs, ): """ Copy a day's data from live ping tables to stable ping tables, deduplicating on document_id. :param str task_id: [Required] ID for the task :param str target_project_id: [Required] ID of project where target tables live :param Tuple[str] billing_projects: ID of projects where queries will be executed, defaults to gcp_conn_id project :param Tuple[str] only_tables: Only process tables matching the given globs of form 'telemetry_live.main_v*' :param Tuple[str] except_tables: Process all tables except those matching the given globs :param int parallelism: Maximum number of queries to execute concurrently :param str priority: BigQuery query priority to use, must be BATCH or INTERACTIVE :param bool hourly: Alias for --slices=24 :param int slices: Number of time-based slices to deduplicate in, rather than for whole days at once :param str gcp_conn_id: Airflow connection id for GCP access :param str gke_project_id: GKE cluster project id :param str gke_location: GKE cluster location :param str gke_cluster_name: GKE cluster name :param str gke_namespace: GKE cluster namespace :param str docker_image: docker image to use :param Dict[str, Any] kwargs: Additional keyword arguments for GKEPodOperator :return: GKEPodOperator """ kwargs["name"] = kwargs.get("name", task_id.replace("_", "-")) table_qualifiers = [] if only_tables: # bqetl expects multiple args as --only ... --only ... for only_table in only_tables: table_qualifiers.append("--only") table_qualifiers.append(only_table) if except_tables: for except_table in except_tables: table_qualifiers.append("--except") table_qualifiers.append(except_table) return GKEPodOperator( task_id=task_id, reattach_on_restart=True, gcp_conn_id=gcp_conn_id, project_id=gke_project_id, location=gke_location, cluster_name=gke_cluster_name, namespace=gke_namespace, image=docker_image, arguments=["script/bqetl", "copy_deduplicate"] + ["--project-id=" + target_project_id] + (["--billing-projects", *list(billing_projects)] if billing_projects else []) + ["--date={{ds}}"] + [f"--parallelism={parallelism}"] + [f"--priority={priority}"] + (["--hourly"] if hourly else []) + ([f"--slices={slices}"] if slices is not None else []) + table_qualifiers, **kwargs, ) def bigquery_dq_check( source_table, dataset_id, task_id, parameters=(), arguments=(), project_id="moz-fx-data-shared-prod", gcp_conn_id="google_cloud_airflow_gke", gke_project_id=GCP_PROJECT_ID, gke_location="us-west1", gke_cluster_name="workloads-prod-v1", gke_namespace="default", docker_image=BIGQUERY_ETL_DOCKER_IMAGE, date_partition_parameter="submission_date", is_dq_check_fail=True, **kwargs, ): """ Run `bqetl check run` to run data quality checks against BigQuery table. :param str destination_table: [Required] BigQuery destination table the DQ checks are run on :param str dataset_id: [Required] BigQuery default dataset id :param str task_id: [ [Required] ID for the task :param Optional[str] sql_file_path: Optional override for path to the SQL query file to run :param Tuple[str] parameters: Parameters passed to bq query :param Optional[str] project_id: BigQuery default project id :param str gcp_conn_id: Airflow connection id for GCP access :param str gke_project_id: GKE cluster project id :param str gke_location: GKE cluster location :param str gke_cluster_name: GKE cluster name :param str gke_namespace: GKE cluster namespace :param str docker_image: docker image to use :param Optional[str] date_partition_parameter: Parameter for indicating destination partition to generate, if None destination should be whole table rather than partition :param Dict[str, Any] kwargs: Additional keyword arguments for GKEPodOperator :return: GKEPodOperator """ kwargs["task_id"] = kwargs.get("task_id", task_id) kwargs["name"] = kwargs.get("name", task_id.replace("_", "-")) destination_table_no_partition = ( source_table.split("$")[0] if source_table is not None else None ) sql_file_path = f"sql/{project_id}/{dataset_id}/{destination_table_no_partition}" marker = ["--marker"] if is_dq_check_fail: marker += ["fail"] else: marker += ["warn"] args = ["script/bqetl", "check", "run", *marker, sql_file_path] return GKEPodOperator( gcp_conn_id=gcp_conn_id, project_id=gke_project_id, location=gke_location, cluster_name=gke_cluster_name, namespace=gke_namespace, image=docker_image, arguments=args + ["--parameter=" + parameter for parameter in parameters] + list(arguments), **kwargs, ) def bigquery_bigeye_check( task_id, table_id, warehouse_id, project_id="moz-fx-data-shared-prod", gcp_conn_id="google_cloud_airflow_gke", gke_project_id=GCP_PROJECT_ID, gke_location="us-west1", gke_cluster_name="workloads-prod-v1", gke_namespace="default", docker_image=BIGQUERY_ETL_DOCKER_IMAGE, **kwargs, ): """ Run `bqetl monitoring run` to run Bigeye checks against BigQuery table. :param str table_id: [Required] BigQuery table the Bigeye checks are run against :param str warehouse_id: [Required] Bigeye warehouse ID where checks located :param str task_id: [Required] ID for the task :param Optional[str] project_id: BigQuery default project id :param str gcp_conn_id: Airflow connection id for GCP access :param str gke_project_id: GKE cluster project id :param str gke_location: GKE cluster location :param str gke_cluster_name: GKE cluster name :param str gke_namespace: GKE cluster namespace :param str docker_image: docker image to use :param Dict[str, Any] kwargs: Additional keyword arguments for GKEPodOperator :return: GKEPodOperator """ kwargs["task_id"] = kwargs.get("task_id", task_id) kwargs["name"] = kwargs.get("name", task_id.replace("_", "-")) args = [ "script/bqetl", "monitoring", "run", table_id, "--warehouse_id", warehouse_id, "--project_id", project_id, ] bigeye_api_key = Secret( deploy_type="env", deploy_target="BIGEYE_API_KEY", secret="airflow-gke-secrets", key="bqetl_artifact_deployment__bigeye_api_key", ) return GKEPodOperator( gcp_conn_id=gcp_conn_id, project_id=gke_project_id, location=gke_location, cluster_name=gke_cluster_name, namespace=gke_namespace, image=docker_image, arguments=list(args), secrets=[bigeye_api_key], **kwargs, ) def bigquery_xcom_query( destination_table, dataset_id, xcom_task_id, parameters=(), arguments=(), project_id=None, gcp_conn_id="google_cloud_airflow_gke", gke_project_id=GCP_PROJECT_ID, gke_location="us-west1", gke_cluster_name="workloads-prod-v1", gke_namespace="default", docker_image=BIGQUERY_ETL_DOCKER_IMAGE, date_partition_parameter="submission_date", table_partition_template="${{ds_nodash}}", **kwargs, ): """ Generate a GKEPodOperator which runs an xcom result as a bigquery query. :param str destination_table: [Required] BigQuery destination table :param str dataset_id: [Required] BigQuery default dataset id :param str xcom_task_id: [Required] task_id which generated the xcom to pull :param Tuple[str] parameters: Parameters passed to bq query :param Tuple[str] arguments: Additional bq query arguments :param Optional[str] project_id: BigQuery default project id :param str gcp_conn_id: Airflow connection id for GCP access :param str gke_project_id: GKE cluster project id :param str gke_location: GKE cluster location :param str gke_cluster_name: GKE cluster name :param str gke_namespace: GKE cluster namespace :param str docker_image: docker image to use :param Optional[str] date_partition_parameter: Parameter for indicating destination partition to generate, if None destination should be whole table rather than partition :param str table_partition_template: Template for the datestring that's used for the partition table id. Defaults to "{{ds_nodash}}". :param Dict[str, Any] kwargs: Additional keyword arguments for GKEPodOperator :return: GKEPodOperator """ kwargs["task_id"] = kwargs.get("task_id", destination_table) kwargs["name"] = kwargs.get("name", kwargs["task_id"].replace("_", "-")) if destination_table is not None and date_partition_parameter is not None: destination_table = destination_table + table_partition_template parameters += (date_partition_parameter + ":DATE:{{ds}}",) query = "{{ " + f"task_instance.xcom_pull({xcom_task_id!r})" + " }}" return GKEPodOperator( gcp_conn_id=gcp_conn_id, project_id=gke_project_id, location=gke_location, cluster_name=gke_cluster_name, namespace=gke_namespace, image=docker_image, arguments=["bq"] + ["query"] + (["--destination_table=" + destination_table] if destination_table else []) + ["--dataset_id=" + dataset_id] + (["--project_id=" + project_id] if project_id else []) + ["--parameter=" + parameter for parameter in parameters] + list(arguments) + [query], **kwargs, ) def normalize_table_id(table_name): """ Normalize table name for use with BigQuery. * Contain up to 1,024 characters * Contain letters (upper or lower case), numbers, and underscores We intentionally lower case the table_name. https://cloud.google.com/bigquery/docs/tables. """ if len(table_name) > 1024: raise ValueError("table_name cannot contain more than 1024 characters") else: return re.sub("\\W+", "_", table_name).lower()