dags/shredder.py (130 lines of code) (raw):
from datetime import datetime, timedelta
from airflow import DAG
from kubernetes.client import models as k8s
from timetable import MultiWeekTimetable
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
docs = """
### shredder
#### Description
These jobs normally need to be restarted many times because of transient
Airflow or Kubernetes API errors or query failures since each query is only
attempted once per task attempt. `main_v5` in particular has partitions
that fail often due to a combination of size, schema, and clustering. In most cases
failed jobs may simply be restarted.
Logs from failed runs are sometimes not available in airflow because Kubernetes Pods are deleted
on exit. Instead, logs can be found in Google Cloud Logging
(change resource.labels.pod_name to get logs for different tasks):
- [shredder-telemetry-main](https://cloudlogging.app.goo.gl/irkg8mKzEy7kBqqg7)
- [shredder-all](https://cloudlogging.app.goo.gl/UVf3T7QMe4EdGQ6h9)
Kubernetes Pods are deleted on exit to prevent multiple running instances. Multiple
running instances will submit redundant queries, because state is only read at the start
of each run. This may cause queries to timeout because only a few may be run in parallel
while the rest are queued.
#### Owner
akomar@mozilla.com
"""
default_args = {
"owner": "akomar@mozilla.com",
"depends_on_past": True,
"start_date": datetime(2023, 5, 16),
"catchup": False,
"email": [
"telemetry-alerts@mozilla.com",
"akomar@mozilla.com",
"bewu@mozilla.com",
],
"email_on_failure": True,
"email_on_retry": False,
"retries": 44,
"retry_delay": timedelta(minutes=5),
}
tags = [
Tag.ImpactTier.tier_2,
Tag.Triage.no_triage,
]
dag = DAG(
"shredder",
default_args=default_args,
# 4 week intervals from start_date. This is similar to
# schedule_interval=timedelta(days=28), except it should actually work.
schedule=MultiWeekTimetable(num_weeks=4),
doc_md=docs,
tags=tags,
)
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
base_command = [
"script/shredder_delete",
"--state-table=moz-fx-data-shredder.shredder_state.shredder_state",
"--task-table=moz-fx-data-shredder.shredder_state.tasks",
# dags run one schedule interval after ds, end date should be one day before the dag
# runs, and schedule intervals are 4 weeks = 28 days, so 28-1 = 27 days after ds
"--end-date={{macros.ds_add(ds, 27)}}",
# start date should be at least two schedule intervals before end date, to avoid
# race conditions with downstream tables and pings received shortly after a
# deletion request. schedule intervals are 4 weeks = 28 days.
"--start-date={{macros.ds_add(ds, 27-7*9)}}",
# non-dml statements use LEFT JOIN instead of IN to filter rows, which takes about
# half as long as of 2022-02-14, and reduces cost by using less flat rate slot time
"--no-use-dml",
]
common_task_args = {
"image": docker_image,
"is_delete_operator_pod": True,
"reattach_on_restart": True,
"dag": dag,
}
# handle telemetry main and main use counter separately to ensure they run continuously
# and don't slow down other tables. run them in a separate project with their own slot
# reservation to ensure they can finish on time, because they use more slots than
# everything else combined
telemetry_main = GKEPodOperator(
task_id="telemetry_main",
name="shredder-telemetry-main",
arguments=[
*base_command,
"--parallelism=2",
"--billing-project=moz-fx-data-shredder",
"--only=telemetry_stable.main_v5",
],
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "512Mi"},
),
**common_task_args,
)
telemetry_main_use_counter = GKEPodOperator(
task_id="telemetry_main_use_counter",
name="shredder-telemetry-main-use-counter",
arguments=[
*base_command,
"--parallelism=2",
"--billing-project=moz-fx-data-shredder",
"--only=telemetry_stable.main_use_counter_v4",
],
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "512Mi"},
),
**common_task_args,
)
# everything else
flat_rate = GKEPodOperator(
task_id="all",
name="shredder-all",
arguments=[
*base_command,
"--parallelism={{ var.value.get('shredder_all_parallelism', 3) }}",
"--billing-project=moz-fx-data-bq-batch-prod",
"--except",
"telemetry_stable.main_v5",
"telemetry_stable.main_use_counter_v4",
"telemetry_derived.event_events_v1",
"firefox_desktop_derived.events_stream_v1",
],
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "3072Mi"},
),
# Give additional time since we may need to scale up when running this job
startup_timeout_seconds=360,
**common_task_args,
)
experiments = GKEPodOperator(
task_id="experiments",
name="shredder-experiments",
arguments=[
*base_command,
"--parallelism=6",
"--billing-project=moz-fx-data-bq-batch-prod",
"--environment=experiments",
],
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "1024Mi"},
),
**common_task_args,
)
# NOTE: avoid using workgroup-restricted tables with sampling because the temp dataset
# is accessible to the data platform group
with_sampling = GKEPodOperator(
task_id="with-sampling",
name="shredder-with-sampling",
arguments=[
*base_command,
"--parallelism={{ var.value.get('shredder_w_sampling_parallelism', 1) }}",
"--sampling-parallelism={{ var.value.get('shredder_w_sampling_sampling_parallelism', 4) }}",
"--temp-dataset=moz-fx-data-shredder.shredder_tmp",
"--billing-project=moz-fx-data-bq-batch-prod",
"--only",
"telemetry_derived.event_events_v1",
"firefox_desktop_derived.events_stream_v1",
"--sampling-tables",
"telemetry_derived.event_events_v1",
"firefox_desktop_derived.events_stream_v1",
],
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "512Mi"},
),
**common_task_args,
)