dags/ga4_site_metrics_summary_backfill.py (64 lines of code) (raw):
import datetime
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskMarker
from utils.gcp import bigquery_dq_check, bigquery_etl_query
docs = """
### ga4_site_metrics_summary_backfill
Backfills the past three days of data for moz-fx-data-shared-prod.mozilla_org_derived.www_site_metrics_summary_v2 since late data can arrive for a few days
Built from bigquery-etl repo, [`dags/bqetl_google_analytics_derived_ga4.py`](https://github.com/mozilla/bigquery-etl/blob/generated-sql/dags/bqetl_google_analytics_derived_ga4.py).
This file is meant to look very similar to generated DAGs in bigquery-etl.
Owner: kwindau@mozilla.com
"""
default_args = {
"owner": "kwindau@mozilla.com",
"start_date": datetime.datetime(2024, 1, 4, 0, 0),
"end_date": None,
"email": ["kwindau@mozilla.com", "telemetry-alerts@mozilla.com"],
"depends_on_past": False,
"retry_delay": datetime.timedelta(seconds=1800),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
}
tags = ["impact/tier_2", "repo/bigquery-etl"]
with DAG(
"ga4_site_metrics_summary_backfill",
default_args=default_args,
schedule_interval="0 1 * * *",
doc_md=docs,
tags=tags,
) as dag:
for day_offset in ["-3", "-2", "-1"]:
task_id = "mozilla_org_derived__www_site_metrics_summary__v2__backfill_" + day_offset
date_str = "macros.ds_add(ds, " + day_offset + ")"
date_str_no_dash = "macros.ds_format(" + date_str + ", '%Y-%m-%d', '%Y%m%d')"
ga4_www_site_metrics_summary_v2_checks = bigquery_dq_check(
task_id="checks__fail_" + task_id,
source_table="www_site_metrics_summary_v2",
dataset_id="mozilla_org_derived",
project_id="moz-fx-data-shared-prod",
is_dq_check_fail=True,
owner="kwindau@mozilla.com",
email=["kwindau@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
parameters=["submission_date:DATE:{{ " + date_str + " }}"],
retries=0,
)
ga4_www_site_metrics_summary_v2 = bigquery_etl_query(
task_id=task_id,
destination_table="www_site_metrics_summary_v2${{ "
+ date_str_no_dash
+ " }}",
dataset_id="mozilla_org_derived",
project_id="moz-fx-data-shared-prod",
owner="kwindau@mozilla.com",
email=["kwindau@mozilla.com", "telemetry-alerts@mozilla.com"],
date_partition_parameter=None,
parameters=["submission_date:DATE:{{ " + date_str + " }}"],
depends_on_past=False,
)
todays_ga4_www_site_metrics_summary_v2 = ExternalTaskMarker(
task_id="rerun__mozilla_org_derived__www_site_metrics_summary__v2__" + day_offset,
external_dag_id="bqetl_google_analytics_derived_ga4",
external_task_id="wait_for_" + task_id,
execution_date="{{ (execution_date - macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
(
ga4_www_site_metrics_summary_v2
>> ga4_www_site_metrics_summary_v2_checks
>> todays_ga4_www_site_metrics_summary_v2
)