dags/firefox_public_data_report.py (152 lines of code) (raw):

""" Powers the public https://data.firefox.com/ dashboard. Source code is in the [firefox-public-data-report-etl repository] (https://github.com/mozilla/firefox-public-data-report-etl). """ from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.gcp import bigquery_etl_query from utils.tags import Tag default_args = { "owner": "bewu@mozilla.com", "depends_on_past": False, "start_date": datetime(2020, 4, 6), "email": [ "telemetry-alerts@mozilla.com", "firefox-hardware-report-feedback@mozilla.com", "akomar@mozilla.com", "shong@mozilla.com", "bewu@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=10), } tags = [Tag.ImpactTier.tier_3] dag = DAG( "firefox_public_data_report", default_args=default_args, schedule_interval="0 1 * * MON", doc_md=__doc__, tags=tags, ) # hardware_report's execution date will be {now}-7days. It will read last week's main pings, # therefore we need to wait for yesterday's Main Ping deduplication task to finish wait_for_main_ping = ExternalTaskSensor( task_id="wait_for_main_ping", external_dag_id="copy_deduplicate", external_task_id="copy_deduplicate_main_ping", execution_delta=timedelta(days=-6), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) hardware_report_query = bigquery_etl_query( task_id="hardware_report_query", destination_table="public_data_report_hardware_aggregates_v1", project_id="moz-fx-data-shared-prod", dataset_id="telemetry_derived", dag=dag, ) hardware_report_export = GKEPodOperator( task_id="hardware_report_export", name="hardware_report_export", image="gcr.io/moz-fx-data-airflow-prod-88e0/firefox-public-data-report-etl:latest", arguments=[ "-m", "public_data_report.cli", "hardware_report", "--date_from", "{{ ds }}", "--input_bq_table", "moz-fx-data-shared-prod.telemetry_derived.public_data_report_hardware_aggregates_v1", "--output_bq_table", "moz-fx-data-shared-prod.telemetry_derived.public_data_report_hardware_v1", "--gcs_bucket", "moz-fx-data-static-websit-8565-analysis-output", "--gcs_path", "public-data-report/hardware/", ], image_pull_policy="Always", dag=dag, ) wait_for_clients_last_seen = ExternalTaskSensor( task_id="wait_for_clients_last_seen", external_dag_id="bqetl_main_summary", external_task_id="telemetry_derived__clients_last_seen__v1", execution_delta=timedelta(days=-6, hours=-1), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) user_activity = bigquery_etl_query( task_id="user_activity", destination_table="public_data_report_user_activity_v1", project_id="moz-fx-data-shared-prod", dataset_id="telemetry_derived", dag=dag, ) user_activity_usage_behavior_export = GKEPodOperator( task_id="user_activity_export", name="user_activity_export", image="gcr.io/moz-fx-data-airflow-prod-88e0/firefox-public-data-report-etl:latest", arguments=[ "-m", "public_data_report.cli", "user_activity", "--bq_table", "moz-fx-data-shared-prod.telemetry_derived.public_data_report_user_activity_v1", "--gcs_bucket", "moz-fx-data-static-websit-8565-analysis-output", "--gcs_path", "public-data-report/user_activity", ], image_pull_policy="Always", dag=dag, ) annotations_export = GKEPodOperator( task_id="annotations_export", name="annotations_export", image="gcr.io/moz-fx-data-airflow-prod-88e0/firefox-public-data-report-etl:latest", arguments=[ "-m", "public_data_report.cli", "annotations", "--date_to", "{{ ds }}", "--output_bucket", "moz-fx-data-static-websit-8565-analysis-output", "--output_prefix", "public-data-report/annotations", ], image_pull_policy="Always", dag=dag, ) ensemble_transposer = GKEPodOperator( task_id="ensemble_transposer", name="ensemble_transposer", image="gcr.io/moz-fx-data-airflow-prod-88e0/ensemble-transposer:latest", env_vars={ "GCS_BUCKET_NAME": "moz-fx-data-static-websit-8565-ensemble", }, image_pull_policy="Always", dag=dag, ) ( wait_for_main_ping >> hardware_report_query >> hardware_report_export >> ensemble_transposer ) ( wait_for_clients_last_seen >> user_activity >> user_activity_usage_behavior_export >> ensemble_transposer ) annotations_export >> ensemble_transposer