dags/update_orphaning_dashboard_etl.py (68 lines of code) (raw):

""" Powers https://telemetry.mozilla.org/update-orphaning/. See [jobs/update_orphaning_dashboard_etl.py](https://github.com/mozilla/telemetry-airflow/blob/main/jobs/update_orphaning_dashboard_etl.py). """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.subdag import SubDagOperator from utils.constants import DS_WEEKLY from utils.dataproc import moz_dataproc_pyspark_runner from utils.tags import Tag """ The following WTMO connections are needed in order for this job to run: conn - google_cloud_airflow_dataproc conn - aws_dev_telemetry_public_analysis_2_rw """ default_args = { "owner": "akomar@mozilla.com", "depends_on_past": False, "start_date": datetime(2019, 10, 12), "email": [ "telemetry-alerts@mozilla.com", "ahabibi@mozilla.com", "rsteuber@mozilla.com", "akomar@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=10), } tags = [Tag.ImpactTier.tier_3] # run every Monday to maintain compatibility with legacy ATMO schedule dag = DAG( "update_orphaning_dashboard_etl", default_args=default_args, schedule_interval="0 2 * * MON", doc_md=__doc__, tags=tags, ) # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = "app-update-out-of-date-dataproc-cluster" # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id = "google_cloud_airflow_dataproc" SubDagOperator( task_id="update_orphaning_dashboard_etl", dag=dag, subdag=moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, dag_name="update_orphaning_dashboard_etl", default_args=default_args, cluster_name=cluster_name, job_name="update_orphaning_dashboard_etl", python_driver_code="gs://moz-fx-data-prod-airflow-dataproc-artifacts/jobs/update_orphaning_dashboard_etl.py", init_actions_uris=[ "gs://dataproc-initialization-actions/python/pip-install.sh" ], additional_metadata={ "PIP_PACKAGES": "google-cloud-bigquery==1.20.0 google-cloud-storage==1.19.1 boto3==1.9.253" }, additional_properties={ "spark:spark.jars.packages": "org.apache.spark:spark-avro_2.11:2.4.3" }, py_args=[ "--run-date", DS_WEEKLY, "--gcs-bucket", "mozdata-analysis", "--gcs-prefix", "update-orphaning-airflow", "--gcs-output-bucket", "moz-fx-data-static-websit-8565-analysis-output", "--gcs-output-path", "app-update/data/out-of-date/", ], idle_delete_ttl=14400, num_workers=20, worker_machine_type="n1-standard-8", gcp_conn_id=gcp_conn_id, ), )