in dags/shredder_backfill.py [0:0]
def base_backfill_operator(dry_run):
"""Create task for backfill, filling out parameters based on dry run."""
return GKEPodOperator(
task_id=DRY_RUN_TASK_ID if dry_run else NON_DRY_RUN_TASK_ID,
cmds=[
"script/shredder_delete",
*(["--dry-run"] if dry_run else []),
# use different tables from scheduled task so they can be monitored separately
"--state-table=moz-fx-data-shredder.shredder_state.shredder_state_backfill",
"--task-table=moz-fx-data-shredder.shredder_state.tasks_backfill",
"--end-date={{ params.request_end_date }}",
"--start-date={{ params.request_start_date }}",
"--no-use-dml",
# low parallelism to reduce slot contention with scheduled task
"--parallelism=1",
"--billing-project=moz-fx-data-bq-batch-prod",
"--only",
],
# target_tables will be rendered as a python list
arguments="{{ params.target_tables }}",
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
is_delete_operator_pod=True,
reattach_on_restart=True,
)