dags/glam_fog_release.py (113 lines of code) (raw):

import operator from datetime import datetime, timedelta from functools import partial, reduce from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor from airflow.utils.task_group import TaskGroup from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.glam_subdags.generate_query import ( generate_and_run_glean_task, ) from utils.tags import Tag default_args = { "owner": "efilho@mozilla.com", "depends_on_past": False, "start_date": datetime(2024, 12, 11), "email": [ "telemetry-alerts@mozilla.com", "akomarzewski@mozilla.com", "efilho@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(minutes=30), } PROJECT = "moz-fx-glam-prod" tags = [Tag.ImpactTier.tier_2] with DAG( "glam_fog_release", default_args=default_args, max_active_runs=1, schedule_interval="0 10 * * 6", # 10am on Saturday tags=tags, ) as dag: wait_for_glam_fog = ExternalTaskSensor( task_id="wait_for_daily_glam_fog_release", external_dag_id="glam_fog", external_task_id="daily_release_done", execution_delta=timedelta(days=-5, hours=-16), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, ) fog_release_done = EmptyOperator( task_id="fog_release_done", ) for product in ["firefox_desktop_glam_release"]: func = partial( generate_and_run_glean_task, product=product, destination_project_id=PROJECT, env_vars={"STAGE": "incremental"}, ) view, init, query = ( partial(func, task_type=task_type) for task_type in ["view", "init", "query"] ) # stage 2 - downstream for export scalar_bucket_counts = query(task_name=f"{product}__scalar_bucket_counts_v1") scalar_probe_counts = query(task_name=f"{product}__scalar_probe_counts_v1") with TaskGroup( group_id=f"{product}__histogram_bucket_counts_v1", dag=dag, default_args=default_args ) as histogram_bucket_counts: prev_task = None # Windows + Release data is in [0-9] so we're further splitting that range. for sample_range in ( [0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6], [7, 7], [8, 8], [9, 9], [10, 19], [20, 29], [30, 39], [40, 49], [50, 59], [60, 69], [70, 79], [80, 89], [90, 99] ): histogram_bucket_counts_sampled = query( task_name=( f"{product}__histogram_bucket_counts_v1_sampled_" f"{sample_range[0]}_{sample_range[1]}" ), min_sample_id=sample_range[0], max_sample_id=sample_range[1], replace_table=(sample_range[0] == 0) ) if prev_task: histogram_bucket_counts_sampled.set_upstream(prev_task) prev_task = histogram_bucket_counts_sampled histogram_probe_counts = query( task_name=f"{product}__histogram_probe_counts_v1" ) probe_counts = view(task_name=f"{product}__view_probe_counts_v1") extract_probe_counts = query(task_name=f"{product}__extract_probe_counts_v1") user_counts = view(task_name=f"{product}__view_user_counts_v1") extract_user_counts = query(task_name=f"{product}__extract_user_counts_v1") sample_counts = view(task_name=f"{product}__view_sample_counts_v1") ( wait_for_glam_fog >> scalar_bucket_counts >> scalar_probe_counts >> probe_counts ) ( wait_for_glam_fog >> histogram_bucket_counts >> histogram_probe_counts >> probe_counts ) probe_counts >> sample_counts >> extract_probe_counts >> fog_release_done ( wait_for_glam_fog >> user_counts >> extract_user_counts >> fog_release_done ) wait_for_glam_fog >> fog_release_done