dags/socorro_import.py (105 lines of code) (raw):

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskMarker from airflow.utils.task_group import TaskGroup from operators.gcp_container_operator import GKEPodOperator from utils.dataproc import moz_dataproc_pyspark_runner from utils.tags import Tag """ This uses dataproc to rewrite the data to parquet in gcs, and load the parquet data into bigquery. The following WTMO connections are needed in order for this job to run: conn - google_cloud_airflow_dataproc conn - google_cloud_airflow_gke """ default_args = { "owner": "srose@mozilla.com", "depends_on_past": False, "start_date": datetime(2019, 9, 10), "email": [ "srose@mozilla.com", "telemetry-alerts@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=30), } tags = [Tag.ImpactTier.tier_2] with DAG( "socorro_import", default_args=default_args, schedule_interval="@daily", tags=tags, ) as dag: # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = "socorro-import-dataproc-cluster" # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id = "google_cloud_airflow_dataproc" project_id = "airflow-dataproc" # We use an application-specific gcs bucket because the data needs to be transformed # in dataproc before loading gcs_data_bucket = "moz-fx-data-prod-socorro-data" dataset = "socorro_crash" dataset_version = "v2" date_submission_col = "crash_date" objects_prefix = "{}/{}/{}={}".format( dataset, dataset_version, date_submission_col, "{{ ds_nodash }}" ) # Spark job reads gcs json and writes gcs parquet crash_report_parquet = SubDagOperator( task_id="crash_report_parquet", subdag=moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, dag_name="crash_report_parquet", default_args=default_args, cluster_name=cluster_name, job_name="Socorro_Crash_Reports_to_Parquet", python_driver_code="gs://moz-fx-data-prod-airflow-dataproc-artifacts/jobs/socorro_import_crash_data.py", py_args=[ "--date", "{{ ds_nodash }}", "--source-gcs-path", "gs://moz-fx-socorro-prod-prod-telemetry/v1/crash_report", "--dest-gcs-path", f"gs://{gcs_data_bucket}/{dataset}", ], idle_delete_ttl=14400, num_workers=8, worker_machine_type="n1-standard-8", gcp_conn_id=gcp_conn_id, ), ) bq_gcp_conn_id = "google_cloud_airflow_gke" # Not using load_to_bigquery since our source data is on GCS. # We do use the parquet2bigquery container to load gcs parquet into bq though. bq_dataset = "telemetry_derived" bq_table_name = f"{dataset}_{dataset_version}" # This image was manually built from # https://github.com/mozilla/parquet2bigquery/commit/6bf1f86076de8939ba2c4d008080d6c159a0a093 # using python:3.7.4-slim-buster docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/parquet2bigquery:20190722" gke_args = [ "--dataset", bq_dataset, "--concurrency", "10", "--bucket", gcs_data_bucket, "--no-resume", "--prefix", objects_prefix, "--cluster-by", "crash_date", ] # We remove the current date partition for idempotency. table_name = "{}:{}.{}${{{{ds_nodash}}}}".format( "{{ var.value.gcp_shared_prod_project }}", bq_dataset, bq_table_name ) remove_bq_table_partition = GKEPodOperator( task_id="remove_socorro_crash_bq_table_partition", gcp_conn_id=bq_gcp_conn_id, name="remove_socorro_crash_bq_table_partition", image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", arguments=["bq", "rm", "-f", "--table", table_name], ) bq_load = GKEPodOperator( task_id="bigquery_load", gcp_conn_id=bq_gcp_conn_id, name="load-socorro-crash-parquet-to-bq", image=docker_image, arguments=gke_args, env_vars={"GOOGLE_CLOUD_PROJECT": "{{ var.value.gcp_shared_prod_project }}"}, ) with TaskGroup("socorro_external") as socorro_external: ExternalTaskMarker( task_id="crash_symbolication__wait_for_socorro_import", external_dag_id="crash_symbolication", external_task_id="wait_for_socorro_import", execution_date="{{ execution_date.replace(hour=5, minute=0).isoformat() }}", ) bq_load >> socorro_external crash_report_parquet >> remove_bq_table_partition >> bq_load