dags/copy_deduplicate.py (302 lines of code) (raw):

import datetime from airflow import models from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskMarker from airflow.utils.task_group import TaskGroup from kubernetes.client import models as k8s from operators.gcp_container_operator import GKEPodOperator from utils.gcp import ( bigquery_etl_copy_deduplicate, bigquery_etl_query, ) from utils.tags import Tag DOCS = """\ # Copy-Deduplicate This DAG is the root of most derived tables. For each live ping table that the data pipeline populates, we run a "copy_deduplicate" query once per day to populate the corresponding stable table. A few immediate downstream tables are also included in this DAG. ## Workflows In early 2021, manual reruns of `copy_deduplicate` were leading to empty partitions, but the root cause has been fixed. See [bug 1690363](https://bugzilla.mozilla.org/show_bug.cgi?id=1690363). ## Changelog In April 2021, `copy_deduplicate_main_ping` was moved from a 100-slice configuration to a single-query configuration, which will change the performance profile and is intended to be more efficient and slightly faster. We also increased the number of parallel queries in `copy_deduplicate_all` to help it finish more quickly and split out `copy_deduplicate_event_ping` to its own task. See [telemetry-airflow#1279]( https://github.com/mozilla/telemetry-airflow/pull/1279/files) """ default_args = { "owner": "akomarzewski@mozilla.com", "start_date": datetime.datetime(2019, 7, 25), "email": [ "telemetry-alerts@mozilla.com", "dataops+alerts@mozilla.com", "akomarzewski@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "depends_on_past": False, # If a task fails, retry it twice after waiting at least 5 minutes "retries": 2, "retry_delay": datetime.timedelta(minutes=5), } dag_name = "copy_deduplicate" tags = [Tag.ImpactTier.tier_1] with models.DAG( dag_name, schedule_interval="0 1 * * *", doc_md=DOCS, default_args=default_args, tags=tags, ) as dag: # This single task is responsible for sequentially running copy queries # over all the tables in _live datasets into _stable datasets except those # that are specifically used in another DAG. resources = k8s.V1ResourceRequirements( requests={"memory": "400Mi"}, ) copy_deduplicate_all_base = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_all_base", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), priority_weight=100, parallelism=10, # Any table listed here under except_tables _must_ have a corresponding # copy_deduplicate job elsewhere. except_tables=[ "telemetry_live.main_v4", "telemetry_live.main_use_counter_v4", "telemetry_live.main_v5", "telemetry_live.event_v4", "telemetry_live.first_shutdown_v4", "telemetry_live.first_shutdown_use_counter_v4", "telemetry_live.first_shutdown_v5", "telemetry_live.saved_session_v4", "telemetry_live.saved_session_use_counter_v4", "telemetry_live.saved_session_v5", "firefox_desktop_live.metrics_v1", ], container_resources=resources, ) copy_deduplicate_sliced = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_sliced", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), priority_weight=100, parallelism=5, hourly=True, only_tables=[ "firefox_desktop_live.metrics_v1", ], container_resources=resources, ) # EmptyOperator is used instead of a task group to maintain compatibility with downstream sensors copy_deduplicate_all = EmptyOperator( task_id="copy_deduplicate_all", ) ( copy_deduplicate_sliced, copy_deduplicate_all_base, ) >> copy_deduplicate_all with TaskGroup("copy_deduplicate_all_external") as copy_deduplicate_all_external: # list of downstream dependencies consisting of external DAG name and execution delta downstream_dependencies = { ("bhr_collection", "hour=5, minute=0"), ("glam_fenix", "hour=2, minute=0"), ("glam_fog", "hour=2, minute=0"), ("bqetl_activity_stream", "hour=2, minute=0"), ("bqetl_amo_stats", "hour=3, minute=0"), ("bqetl_core", "hour=2, minute=0"), ("bqetl_ctxsvc_derived", "hour=3, minute=0"), ("bqetl_desktop_funnel", "hour=4, minute=0"), ("bqetl_event_rollup", "hour=3, minute=0"), ("bqetl_experiments_daily", "hour=3, minute=0"), ("bqetl_feature_usage", "hour=5, minute=0"), ("bqetl_fenix_event_rollup", "hour=2, minute=0"), ("bqetl_firefox_ios", "hour=4, minute=0"), ("bqetl_fog_decision_support", "hour=4, minute=0"), ("bqetl_internal_tooling", "hour=4, minute=0"), ("bqetl_internet_outages", "hour=7, minute=0"), ("bqetl_messaging_system", "hour=2, minute=0"), ("bqetl_main_summary", "hour=2, minute=0"), ("bqetl_messaging_system", "hour=2, minute=0"), ("bqetl_mobile_activation", "hour=0, minute=0"), ("bqetl_mobile_search", "hour=2, minute=0"), ("bqetl_monitoring", "hour=2, minute=0"), ("bqetl_newtab", "hour=0, minute=0"), ("bqetl_org_mozilla_fenix_derived", "hour=2, minute=0"), ("bqetl_org_mozilla_firefox_derived", "hour=2, minute=0"), ("bqetl_org_mozilla_focus_derived", "hour=2, minute=0"), ("bqetl_public_data_json", "hour=5, minute=0"), ("bqetl_regrets_reporter_summary", "hour=4, minute=0"), ("bqetl_search_terms_daily", "hour=3, minute=0"), ("bqetl_sponsored_tiles_clients_daily", "hour=4, minute=0"), ("bqetl_urlbar", "hour=3, minute=0"), } for downstream_dependency in downstream_dependencies: ExternalTaskMarker( task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_all", external_dag_id=downstream_dependency[0], external_task_id="wait_for_copy_deduplicate_all", execution_date="{{ execution_date.replace(" + downstream_dependency[1] + ").isoformat() }}", ) copy_deduplicate_all >> copy_deduplicate_all_external # We split out main ping since it's the highest volume and has a distinct # set of downstream dependencies. copy_deduplicate_main_ping = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_main_ping", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), only_tables=[ "telemetry_live.main_v4", "telemetry_live.main_use_counter_v4", "telemetry_live.main_v5", ], priority_weight=100, parallelism=5, slices=20, owner="akomarzewski@mozilla.com", email=[ "telemetry-alerts@mozilla.com", "akomarzewski@mozilla.com", ], ) with TaskGroup("main_ping_external") as main_ping_external: downstream_dependencies = { ("graphics_telemetry", "hour=3, minute=0"), ("glam", "hour=2, minute=0"), ("bqetl_addons", "hour=4, minute=0"), ("bqetl_amo_stats", "hour=3, minute=0"), ("bqetl_desktop_platform", "hour=3, minute=0"), ("bqetl_devtools", "hour=3, minute=0"), ("bqetl_experiments_daily", "hour=3, minute=0"), ("bqetl_fog_decision_support", "hour=4, minute=0"), ("bqetl_internet_outages", "hour=7, minute=0"), ("bqetl_main_summary", "hour=2, minute=0"), ("bqetl_monitoring", "hour=2, minute=0"), ("bqetl_public_data_json", "hour=5, minute=0"), ("bqetl_sponsored_tiles_clients_daily", "hour=4, minute=0"), ("bqetl_ssl_ratios", "hour=2, minute=0"), } for downstream_dependency in downstream_dependencies: ExternalTaskMarker( task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_main_ping", external_dag_id=downstream_dependency[0], external_task_id="wait_for_copy_deduplicate_main_ping", execution_date="{{ execution_date.replace(" + downstream_dependency[1] + ").isoformat() }}", ) copy_deduplicate_main_ping >> main_ping_external # We also separate out variant pings that share the main ping schema since these # ultrawide tables can sometimes have unique performance problems. copy_deduplicate_first_shutdown_ping = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_first_shutdown_ping", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), only_tables=[ "telemetry_live.first_shutdown_v4", "telemetry_live.first_shutdown_use_counter_v4", "telemetry_live.first_shutdown_v5", ], priority_weight=50, parallelism=1, owner="akomarzewski@mozilla.com", ) with TaskGroup("first_shutdown_ping_external") as first_shutdown_ping_external: downstream_dependencies = { ("bqetl_analytics_tables", "hour=2, minute=0"), } for downstream_dependency in downstream_dependencies: ExternalTaskMarker( task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_first_shutdown_ping", external_dag_id=downstream_dependency[0], external_task_id="wait_for_copy_deduplicate_first_shutdown_ping", execution_date="{{ execution_date.replace(" + downstream_dependency[1] + ").isoformat() }}", ) copy_deduplicate_first_shutdown_ping >> first_shutdown_ping_external copy_deduplicate_saved_session_ping = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_saved_session_ping", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), only_tables=[ "telemetry_live.saved_session_v4", "telemetry_live.saved_session_use_counter_v4", "telemetry_live.saved_session_v5", ], priority_weight=50, parallelism=1, owner="akomarzewski@mozilla.com", ) # Events. copy_deduplicate_event_ping = bigquery_etl_copy_deduplicate( task_id="copy_deduplicate_event_ping", target_project_id="moz-fx-data-shared-prod", billing_projects=("moz-fx-data-shared-prod",), only_tables=["telemetry_live.event_v4"], priority_weight=100, parallelism=1, owner="akomarzewski@mozilla.com", ) event_events = bigquery_etl_query( reattach_on_restart=True, task_id="event_events", project_id="moz-fx-data-shared-prod", destination_table="event_events_v1", dataset_id="telemetry_derived", priority_weight=90, owner="akomarzewski@mozilla.com", arguments=("--schema_update_option=ALLOW_FIELD_ADDITION",), ) with TaskGroup("event_events_external") as event_events_external: downstream_dependencies = { ("jetstream", "hour=4, minute=0"), ("bqetl_amo_stats", "hour=3, minute=0"), ("bqetl_experiments_daily", "hour=3, minute=0"), ("bqetl_feature_usage", "hour=5, minute=0"), ("bqetl_main_summary", "hour=2, minute=0"), } for downstream_dependency in downstream_dependencies: ExternalTaskMarker( task_id=f"{downstream_dependency[0]}__wait_for_event_events", external_dag_id=downstream_dependency[0], external_task_id="wait_for_event_events", execution_date="{{ execution_date.replace(" + downstream_dependency[1] + ").isoformat() }}", ) event_events >> event_events_external copy_deduplicate_event_ping >> event_events bq_main_events = bigquery_etl_query( reattach_on_restart=True, task_id="bq_main_events", project_id="moz-fx-data-shared-prod", destination_table="main_events_v1", dataset_id="telemetry_derived", priority_weight=90, owner="akomarzewski@mozilla.com", dag=dag, arguments=("--schema_update_option=ALLOW_FIELD_ADDITION",), ) with TaskGroup("bq_main_events_external") as bq_main_events_external: downstream_dependencies = { ("jetstream", "hour=4, minute=0"), ("bqetl_amo_stats", "hour=3, minute=0"), ("bqetl_experiments_daily", "hour=3, minute=0"), ("bqetl_feature_usage", "hour=5, minute=0"), ("bqetl_main_summary", "hour=2, minute=0"), } for downstream_dependency in downstream_dependencies: ExternalTaskMarker( task_id=f"{downstream_dependency[0]}__wait_for_bq_main_events", external_dag_id=downstream_dependency[0], external_task_id="wait_for_bq_main_events", execution_date="{{ execution_date.replace(" + downstream_dependency[1] + ").isoformat() }}", ) bq_main_events >> bq_main_events_external copy_deduplicate_main_ping >> bq_main_events # Daily and last seen views on top of every Glean application. # The core clients first seen dataset is a dependency to glean usage # queries. Ideally, it would belong inside of a generated bigquery-etl DAG # (e.g. bqetl_core), but this would require splitting this DAG into three # separate parts threaded by sensors. Since the first_seen_table will end up # being part of the clients daily table in this DAG, it will be easier to # reason about dependencies in this single DAG while it is being developed. telemetry_derived__core_clients_first_seen__v1 = bigquery_etl_query( reattach_on_restart=True, task_id="telemetry_derived__core_clients_first_seen__v1", destination_table="core_clients_first_seen_v1", dataset_id="telemetry_derived", project_id="moz-fx-data-shared-prod", owner="ascholtz@mozilla.com", email=["ascholtz@mozilla.com", "telemetry-alerts@mozilla.com"], date_partition_parameter="submission_date", depends_on_past=True, dag=dag, ) with TaskGroup( "core_clients_first_seen_external" ) as core_clients_first_seen_external: ExternalTaskMarker( task_id="bqetl_core__wait_for_core_clients_first_seen", external_dag_id="bqetl_core", external_task_id="wait_for_telemetry_derived__core_clients_first_seen__v1", execution_date="{{ execution_date.replace(hour=2, minute=0).isoformat() }}", ) ( telemetry_derived__core_clients_first_seen__v1 >> core_clients_first_seen_external ) copy_deduplicate_all >> telemetry_derived__core_clients_first_seen__v1