def base_backfill_operator()

in dags/shredder_backfill.py [0:0]


def base_backfill_operator(dry_run):
    """Create task for backfill, filling out parameters based on dry run."""
    return GKEPodOperator(
        task_id=DRY_RUN_TASK_ID if dry_run else NON_DRY_RUN_TASK_ID,
        cmds=[
            "script/shredder_delete",
            *(["--dry-run"] if dry_run else []),
            # use different tables from scheduled task so they can be monitored separately
            "--state-table=moz-fx-data-shredder.shredder_state.shredder_state_backfill",
            "--task-table=moz-fx-data-shredder.shredder_state.tasks_backfill",
            "--end-date={{ params.request_end_date }}",
            "--start-date={{ params.request_start_date }}",
            "--no-use-dml",
            # low parallelism to reduce slot contention with scheduled task
            "--parallelism=1",
            "--billing-project=moz-fx-data-bq-batch-prod",
            "--only",
        ],
        # target_tables will be rendered as a python list
        arguments="{{ params.target_tables }}",
        image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
        is_delete_operator_pod=True,
        reattach_on_restart=True,
    )