dags/shredder_backfill.py (88 lines of code) (raw):
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import BranchPythonOperator
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
docs = """
### shredder-backfill
#### Description
Manually triggered DAG that handles deletion requests from a specified time period
for a list of given tables.
`target_tables` is a list of tables formatted as `dataset.table_name` with one table per line.
The moz-fx-data-shared-prod project is assumed because shredder currently only runs
on tables in this project.
Use the dry run parameter run shredder with the --dry-run option to validate parameters.
Note that the shredder dry run will still dry run queries against every partition of each table
so it may take a long time to finish if a lot of tables are given.
This DAG is meant to be used to handle older deletion requests for tables that are already being
shredded. Any provided tables that aren't already valid deletion targets will be ignored.
#### Owner
bewu@mozilla.com
"""
params = {
"request_start_date": Param(
default=(date.today() - timedelta(days=7)).isoformat(),
description="First date of deletion requests to process",
type="string",
format="date",
),
"request_end_date": Param(
default=(date.today()).isoformat(),
description="Last date of data (i.e. partition) to delete from",
type="string",
format="date",
),
"target_tables": Param(
default=["dataset.table_name"],
description="Tables to delete from (one per line)",
type="array",
minItems=1,
),
"dry_run": Param(default=True, type="boolean"),
}
default_args = {
"owner": "bewu@mozilla.com",
"depends_on_past": False,
"start_date": datetime(2024, 3, 1),
"catchup": False,
"email": [
"telemetry-alerts@mozilla.com",
"bewu@mozilla.com",
],
"email_on_failure": True,
"email_on_retry": False,
# transient failures are expected and can be handled with state table
"retries": 44,
"retry_delay": timedelta(minutes=5),
}
tags = [
Tag.ImpactTier.tier_3,
Tag.Triage.no_triage,
]
NON_DRY_RUN_TASK_ID = "shredder_backfill"
DRY_RUN_TASK_ID = "shredder_backfill_dry_run"
def base_backfill_operator(dry_run):
"""Create task for backfill, filling out parameters based on dry run."""
return GKEPodOperator(
task_id=DRY_RUN_TASK_ID if dry_run else NON_DRY_RUN_TASK_ID,
cmds=[
"script/shredder_delete",
*(["--dry-run"] if dry_run else []),
# use different tables from scheduled task so they can be monitored separately
"--state-table=moz-fx-data-shredder.shredder_state.shredder_state_backfill",
"--task-table=moz-fx-data-shredder.shredder_state.tasks_backfill",
"--end-date={{ params.request_end_date }}",
"--start-date={{ params.request_start_date }}",
"--no-use-dml",
# low parallelism to reduce slot contention with scheduled task
"--parallelism=1",
"--billing-project=moz-fx-data-bq-batch-prod",
"--only",
],
# target_tables will be rendered as a python list
arguments="{{ params.target_tables }}",
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
is_delete_operator_pod=True,
reattach_on_restart=True,
)
with DAG(
"shredder_backfill",
default_args=default_args,
schedule=None,
doc_md=docs,
tags=tags,
params=params,
# needed to pass the list of tables as a list to the pod operator
render_template_as_native_obj=True,
) as dag:
# Use separate tasks for dry run to make logs easier to find
dry_run_branch = BranchPythonOperator(
task_id="dry_run_branch",
python_callable=lambda dry_run: (
DRY_RUN_TASK_ID if dry_run else NON_DRY_RUN_TASK_ID
),
op_kwargs={"dry_run": "{{ params.dry_run }}"},
)
backfill_tasks = [
base_backfill_operator(dry_run_value) for dry_run_value in (True, False)
]
dry_run_branch >> backfill_tasks