dags/graphics_telemetry.py (120 lines of code) (raw):

""" A job to power graphics dashboard. Processes main ping data and exports to GCS to power a graphics dashboard at https://firefoxgraphics.github.io/telemetry/. This was originally a Databricks notebook that was migrated to a scheduled Dataproc task. Source code lives in the [FirefoxGraphics/telemetry](https://github.com/FirefoxGraphics/telemetry) repository. This is a overwrite kind of operation and as long as the most recent DAG run succeeded the job should be considered healthy. """ import datetime from airflow import DAG from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskSensor from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner from utils.tags import Tag default_args = { "owner": "kik@mozilla.com", "depends_on_past": False, "start_date": datetime.datetime(2020, 11, 26), "email": [ "telemetry-alerts@mozilla.com", "kik@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": datetime.timedelta(minutes=20), } PIP_PACKAGES = [ "git+https://github.com/mozilla/python_moztelemetry.git@v0.10.7#egg=python-moztelemetry", "git+https://github.com/FirefoxGraphics/telemetry.git#egg=pkg&subdirectory=analyses/bigquery_shim", "boto3==1.16.20", "six==1.15.0", ] GCS_BUCKET = "moz-fx-data-static-websit-8565-analysis-output" GCS_PREFIX = "gfx/telemetry-data/" tags = [Tag.ImpactTier.tier_1] with DAG( "graphics_telemetry", default_args=default_args, schedule_interval="0 3 * * *", doc_md=__doc__, tags=tags, ) as dag: wait_for_main_ping = ExternalTaskSensor( task_id="wait_for_copy_deduplicate_main_ping", external_dag_id="copy_deduplicate", external_task_id="copy_deduplicate_main_ping", execution_delta=datetime.timedelta(hours=2), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) params = get_dataproc_parameters("google_cloud_airflow_dataproc") graphics_trends = SubDagOperator( task_id="graphics_trends", dag=dag, subdag=moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, image_version="1.5-debian10", dag_name="graphics_trends", default_args=default_args, cluster_name="graphics-trends-{{ ds }}", job_name="graphics-trends", python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/graphics/graphics_telemetry_trends.py", init_actions_uris=[ "gs://dataproc-initialization-actions/python/pip-install.sh" ], additional_metadata={"PIP_PACKAGES": " ".join(PIP_PACKAGES)}, additional_properties={ "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar", }, py_args=[ "--gcs-bucket", GCS_BUCKET, "--gcs-prefix", GCS_PREFIX, "--weekly-fraction", "0.003", ], idle_delete_ttl=14400, num_workers=2, worker_machine_type="n1-standard-4", gcp_conn_id=params.conn_id, service_account=params.client_email, storage_bucket=params.storage_bucket, ), ) graphics_dashboard = SubDagOperator( task_id="graphics_dashboard", dag=dag, subdag=moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, image_version="1.5-debian10", dag_name="graphics_dashboard", default_args=default_args, cluster_name="graphics-dashboard-{{ ds }}", job_name="graphics-dashboard", python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/graphics/graphics_telemetry_dashboard.py", init_actions_uris=[ "gs://dataproc-initialization-actions/python/pip-install.sh" ], additional_metadata={"PIP_PACKAGES": " ".join(PIP_PACKAGES)}, additional_properties={ "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar", }, py_args=[ "--output-bucket", GCS_BUCKET, "--output-prefix", GCS_PREFIX, "--release-fraction", "0.003", ], idle_delete_ttl=14400, num_workers=2, worker_machine_type="n1-highmem-4", gcp_conn_id=params.conn_id, service_account=params.client_email, storage_bucket=params.storage_bucket, ), ) wait_for_main_ping >> graphics_trends wait_for_main_ping >> graphics_dashboard