dags/glam.py (286 lines of code) (raw):
"""
Desktop ETL for https://glam.telemetry.mozilla.org/.
Generates and runs a series of BQ queries, see
[bigquery_etl/glam](https://github.com/mozilla/bigquery-etl/tree/main/bigquery_etl/glam)
in bigquery-etl and the
[glam_subdags](https://github.com/mozilla/telemetry-airflow/tree/main/dags/glam_subdags)
in telemetry-airflow.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query
from utils.glam_subdags.general import repeated_subdag
from utils.glam_subdags.generate_query import generate_and_run_desktop_query
from utils.glam_subdags.histograms import histogram_aggregates_subdag
from utils.tags import Tag
project_id = "moz-fx-data-shared-prod"
table_project_id = "moz-fx-data-shared-prod"
billing_project_id = "moz-fx-glam-prod"
dataset_id = "telemetry_derived"
fully_qualified_dataset = f"{table_project_id}:{dataset_id}"
tmp_project = "moz-fx-data-shared-prod" # for temporary tables in analysis dataset
default_args = {
"owner": "efilho@mozilla.com",
"depends_on_past": False,
"start_date": datetime(2019, 10, 22),
"email": [
"telemetry-alerts@mozilla.com",
"akomarzewski@mozilla.com",
"efilho@mozilla.com",
],
"email_on_failure": True,
"email_on_retry": True,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}
GLAM_DAG = "glam"
GLAM_CLIENTS_HISTOGRAM_AGGREGATES_SUBDAG = "clients_histogram_aggregates"
PERCENT_RELEASE_WINDOWS_SAMPLING = "10"
tags = [Tag.ImpactTier.tier_2]
dag = DAG(
GLAM_DAG,
default_args=default_args,
schedule_interval="0 16 * * *",
doc_md=__doc__,
tags=tags,
)
# Make sure all the data for the given day has arrived before running.
wait_for_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
execution_delta=timedelta(hours=15),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
latest_versions = bigquery_etl_query(
reattach_on_restart=True,
task_id="latest_versions",
destination_table="latest_versions",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/latest_versions/query.sql",
project_id=billing_project_id,
date_partition_parameter=None,
arguments=("--replace",),
dag=dag,
)
# This task runs first and replaces the relevant partition, followed
# by the next two tasks that append to the same partition of the same table.
clients_daily_scalar_aggregates = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_scalar_aggregates",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=True,
probe_type="scalar",
dag=dag,
)
clients_daily_keyed_scalar_aggregates = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_keyed_scalar_aggregates",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=False,
probe_type="keyed_scalar",
dag=dag,
)
clients_daily_keyed_boolean_aggregates = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_keyed_boolean_aggregates",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=False,
probe_type="keyed_boolean",
dag=dag,
)
clients_scalar_aggregates = bigquery_etl_query(
reattach_on_restart=True,
task_id="clients_scalar_aggregates",
destination_table="clients_scalar_aggregates_v1",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_scalar_aggregates_v1/query.sql",
project_id=billing_project_id,
depends_on_past=True,
arguments=("--replace",),
dag=dag,
)
# This task runs first and replaces the relevant partition, followed
# by the next task below that appends to the same partition of the same table.
clients_daily_histogram_aggregates_parent = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_histogram_aggregates_parent",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=True,
probe_type="histogram",
process="parent",
get_logs=False,
dag=dag,
)
clients_daily_histogram_aggregates_content = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_histogram_aggregates_content",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=False,
probe_type="histogram",
process="content",
get_logs=False,
dag=dag,
)
clients_daily_histogram_aggregates_gpu = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_histogram_aggregates_gpu",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=False,
probe_type="histogram",
process="gpu",
get_logs=False,
dag=dag,
)
clients_daily_keyed_histogram_aggregates = generate_and_run_desktop_query(
reattach_on_restart=True,
task_id="clients_daily_keyed_histogram_aggregates",
project_id=project_id,
billing_project_id=billing_project_id,
source_dataset_id=dataset_id,
sample_size=PERCENT_RELEASE_WINDOWS_SAMPLING,
overwrite=False,
probe_type="keyed_histogram",
get_logs=False,
dag=dag,
)
clients_histogram_aggregates = SubDagOperator(
subdag=histogram_aggregates_subdag(
parent_dag_name=GLAM_DAG,
child_dag_name=GLAM_CLIENTS_HISTOGRAM_AGGREGATES_SUBDAG,
default_args=default_args,
schedule_interval=dag.schedule_interval,
dataset_id=dataset_id,
fully_qualified_dataset=fully_qualified_dataset,
billing_project_id=billing_project_id,
table_project_id=table_project_id,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
),
task_id=GLAM_CLIENTS_HISTOGRAM_AGGREGATES_SUBDAG,
dag=dag,
)
glam_sample_counts = bigquery_etl_query(
reattach_on_restart=True,
task_id="glam_sample_counts",
destination_table="glam_sample_counts_v1",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/glam_sample_counts_v1/query.sql",
date_partition_parameter=None,
parameters=("submission_date:DATE:{{ds}}",),
arguments=("--replace",),
dag=dag,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
)
client_scalar_probe_counts = GKEPodOperator(
reattach_on_restart=True,
task_id="client_scalar_probe_counts",
arguments=[
"python3",
"script/glam/run_scalar_agg_clustered_query.py",
"--submission-date",
"{{ds}}",
"--dst-table",
"clients_scalar_probe_counts_v1",
"--project",
project_id,
"--billing-project",
billing_project_id,
"--tmp-project",
tmp_project,
"--dataset",
dataset_id,
],
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
is_delete_operator_pod=False,
dag=dag,
)
clients_histogram_bucket_counts = SubDagOperator(
subdag=repeated_subdag(
GLAM_DAG,
"clients_histogram_bucket_counts",
default_args,
dag.schedule_interval,
billing_project_id,
table_project_id,
dataset_id,
fully_qualified_dataset,
("submission_date:DATE:{{ds}}",),
20,
None,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
),
task_id="clients_histogram_bucket_counts",
dag=dag,
)
clients_histogram_probe_counts = bigquery_etl_query(
reattach_on_restart=True,
task_id="clients_histogram_probe_counts",
destination_table="clients_histogram_probe_counts_v1",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_histogram_probe_counts_v1/query.sql",
project_id=billing_project_id,
date_partition_parameter=None,
arguments=("--replace", "--clustering_fields=metric,channel"),
dag=dag,
)
with dag as dag:
with TaskGroup(
group_id="extracts", dag=dag, default_args=default_args
) as extracts_per_channel:
for channel in ("nightly", "beta", "release"):
bq_extract_table = f"glam_extract_firefox_{channel}_v1"
etl_query = bigquery_etl_query(
task_id=f"glam_client_probe_counts_{channel}_extract",
destination_table=bq_extract_table,
dataset_id=fully_qualified_dataset,
project_id=billing_project_id,
date_partition_parameter=None,
arguments=("--replace",),
sql_file_path=f"sql/moz-fx-data-shared-prod/{dataset_id}/glam_extract_firefox_{channel}_v1/query.sql",
dag=dag,
)
with TaskGroup("glam_external") as glam_external:
ExternalTaskMarker(
task_id="glam_glean_imports__wait_for_glam",
external_dag_id="glam_glean_imports",
external_task_id="wait_for_glam",
execution_date="{{ execution_date.replace(hour=5, minute=0).isoformat() }}",
)
extracts_per_channel >> glam_external
wait_for_main_ping >> latest_versions
latest_versions >> clients_daily_scalar_aggregates
clients_daily_scalar_aggregates >> clients_daily_keyed_scalar_aggregates
clients_daily_scalar_aggregates >> clients_daily_keyed_boolean_aggregates
clients_daily_keyed_boolean_aggregates >> clients_scalar_aggregates
clients_daily_keyed_scalar_aggregates >> clients_scalar_aggregates
clients_scalar_aggregates >> client_scalar_probe_counts
latest_versions >> clients_daily_histogram_aggregates_parent
clients_daily_histogram_aggregates_parent >> clients_daily_histogram_aggregates_content
clients_daily_histogram_aggregates_parent >> clients_daily_histogram_aggregates_gpu
clients_daily_histogram_aggregates_parent >> clients_daily_keyed_histogram_aggregates
clients_daily_histogram_aggregates_content >> clients_histogram_aggregates
clients_daily_histogram_aggregates_gpu >> clients_histogram_aggregates
clients_daily_keyed_histogram_aggregates >> clients_histogram_aggregates
clients_histogram_aggregates >> clients_histogram_bucket_counts
clients_histogram_aggregates >> glam_sample_counts
clients_histogram_bucket_counts >> clients_histogram_probe_counts
clients_scalar_aggregates >> glam_sample_counts
client_scalar_probe_counts >> extracts_per_channel
clients_histogram_probe_counts >> extracts_per_channel
glam_sample_counts >> extracts_per_channel