dags/glam_fenix.py (174 lines of code) (raw):

""" Firefox for Android ETL for https://glam.telemetry.mozilla.org/. Generates and runs a series of BQ queries, see [bigquery_etl/glam](https://github.com/mozilla/bigquery-etl/tree/main/bigquery_etl/glam) in bigquery-etl and the [glam_subdags](https://github.com/mozilla/telemetry-airflow/tree/main/dags/glam_subdags) in telemetry-airflow. """ import operator from datetime import datetime, timedelta from functools import partial, reduce from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.task_group import TaskGroup from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.glam_subdags.generate_query import ( generate_and_run_glean_queries, generate_and_run_glean_task, ) from utils.tags import Tag default_args = { "owner": "efilho@mozilla.com", "depends_on_past": False, "start_date": datetime(2020, 2, 19), "email": [ "telemetry-alerts@mozilla.com", "akomarzewski@mozilla.com", "efilho@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(minutes=30), } PROJECT = "moz-fx-glam-prod" # Fenix as a product has a convoluted app_id history. The comments note the # start and end dates of the id in the app store. # https://docs.google.com/spreadsheets/d/18PzkzZxdpFl23__-CIO735NumYDqu7jHpqllo0sBbPA PRODUCTS = [ "org_mozilla_fenix", # 2019-06-29 - 2020-07-03 (beta), 2020-07-03 - present (nightly) "org_mozilla_fenix_nightly", # 2019-06-30 - 2020-07-03 "org_mozilla_firefox", # 2020-07-28 - present "org_mozilla_firefox_beta", # 2020-03-26 - present "org_mozilla_fennec_aurora", # 2020-01-21 - 2020-07-03 ] # This is only required if there is a logical mapping defined within the # bigquery-etl module within the templates/logical_app_id folder. This builds # the dependency graph such that the view with the logical clients daily table # is always pointing to a concrete partition in BigQuery. LOGICAL_MAPPING = { "org_mozilla_fenix_glam_nightly": [ "org_mozilla_fenix_nightly", "org_mozilla_fenix", "org_mozilla_fennec_aurora", ], "org_mozilla_fenix_glam_beta": ["org_mozilla_fenix", "org_mozilla_firefox_beta"], "org_mozilla_fenix_glam_release": ["org_mozilla_firefox"], } tags = [Tag.ImpactTier.tier_1] with DAG( "glam_fenix", default_args=default_args, max_active_runs=1, schedule_interval="0 2 * * *", doc_md=__doc__, tags=tags, ) as dag: wait_for_copy_deduplicate = ExternalTaskSensor( task_id="wait_for_copy_deduplicate", external_dag_id="copy_deduplicate", external_task_id="copy_deduplicate_all", execution_delta=timedelta(hours=1), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, ) mapping = {} for product in PRODUCTS: query = generate_and_run_glean_queries( task_id=f"daily_{product}", product=product, destination_project_id=PROJECT, env_vars={"STAGE": "daily"}, ) mapping[product] = query wait_for_copy_deduplicate >> query # the set of logical ids and the set of ids that are not mapped to logical ids final_products = set(LOGICAL_MAPPING.keys()) | set(PRODUCTS) - set( reduce(operator.iadd, LOGICAL_MAPPING.values(), []) ) for product in final_products: is_release = "release" in product done = EmptyOperator(task_id=f"{product}_done") func = partial( generate_and_run_glean_task, product=product, destination_project_id=PROJECT, env_vars={"STAGE": "incremental"}, ) view, init, query = ( partial(func, task_type=task_type) for task_type in ["view", "init", "query"] ) # stage 1 - incremental clients_daily_histogram_aggregates = view( task_name=f"{product}__view_clients_daily_histogram_aggregates_v1" ) clients_daily_scalar_aggregates = view( task_name=f"{product}__view_clients_daily_scalar_aggregates_v1" ) latest_versions = query(task_name=f"{product}__latest_versions_v1") clients_scalar_aggregates_init = init( task_name=f"{product}__clients_scalar_aggregates_v1" ) clients_scalar_aggregates = query( task_name=f"{product}__clients_scalar_aggregates_v1" ) clients_histogram_aggregates_init = init( task_name=f"{product}__clients_histogram_aggregates_v1" ) clients_histogram_aggregates = query( task_name=f"{product}__clients_histogram_aggregates_v1" ) # set all of the dependencies for all of the tasks # get the dependencies for the logical mapping, or just pass through the # daily query unmodified for dependency in LOGICAL_MAPPING.get(product, [product]): mapping[dependency] >> clients_daily_scalar_aggregates mapping[dependency] >> clients_daily_histogram_aggregates # only the scalar aggregates are upstream of latest versions clients_daily_scalar_aggregates >> latest_versions latest_versions >> clients_scalar_aggregates_init latest_versions >> clients_histogram_aggregates_init ( clients_daily_scalar_aggregates >> clients_scalar_aggregates_init >> clients_scalar_aggregates ) ( clients_daily_histogram_aggregates >> clients_histogram_aggregates_init >> clients_histogram_aggregates ) if is_release: clients_histogram_aggregates >> done clients_scalar_aggregates >> done else: # stage 2 - downstream for export scalar_bucket_counts = query(task_name=f"{product}__scalar_bucket_counts_v1") scalar_probe_counts = query(task_name=f"{product}__scalar_probe_counts_v1") with TaskGroup( group_id=f"{product}__histogram_bucket_counts_v1", dag=dag, default_args=default_args ) as histogram_bucket_counts: prev_task = None for sample_range in ([0, 19], [20, 39], [40, 59], [60, 79], [80, 99]): histogram_bucket_counts_sampled = query( task_name=f"{product}__histogram_bucket_counts_v1_sampled_{sample_range[0]}_{sample_range[1]}", min_sample_id=sample_range[0], max_sample_id=sample_range[1], replace_table=(sample_range[0] == 0) ) if prev_task: histogram_bucket_counts_sampled.set_upstream(prev_task) prev_task = histogram_bucket_counts_sampled histogram_probe_counts = query( task_name=f"{product}__histogram_probe_counts_v1" ) probe_counts = view(task_name=f"{product}__view_probe_counts_v1") extract_probe_counts = query(task_name=f"{product}__extract_probe_counts_v1") user_counts = view(task_name=f"{product}__view_user_counts_v1") extract_user_counts = query(task_name=f"{product}__extract_user_counts_v1") sample_counts = view(task_name=f"{product}__view_sample_counts_v1") ( clients_scalar_aggregates >> scalar_bucket_counts >> scalar_probe_counts >> probe_counts ) ( clients_histogram_aggregates >> histogram_bucket_counts >> histogram_probe_counts >> probe_counts ) probe_counts >> sample_counts >> extract_probe_counts >> done ( clients_scalar_aggregates >> user_counts >> extract_user_counts >> done ) clients_histogram_aggregates >> done