utils/backfill.py (48 lines of code) (raw):

from __future__ import annotations import dataclasses import datetime import re @dataclasses.dataclass class BackfillParams: dag_name: str start_date: str end_date: str clear: bool dry_run: bool task_regex: str | None def validate_date_range(self) -> None: start_date = datetime.datetime.fromisoformat(self.start_date) end_date = datetime.datetime.fromisoformat(self.end_date) if start_date > end_date: raise ValueError( f"`start_date`={self.start_date} is greater than `end_date`={self.end_date}" ) def validate_regex_pattern(self) -> None: if self.task_regex: try: re.compile(self.task_regex) except re.error: raise ValueError( f"Invalid regex pattern for `task_regex`={self.task_regex}" ) from None def generate_backfill_command(self) -> list[str]: """ Backfill command based off the Airflow plugin implemented by hwoo. Original implementation in plugins/backfill/main.py """ # Construct the airflow command cmd = ["airflow"] if self.clear: cmd.extend(["tasks", "clear"]) if self.dry_run: # For dry runs we simply time out to avoid zombie procs waiting on user input. # The output is what we're interested in timeout_list = ["timeout", "60"] cmd = timeout_list + cmd else: cmd.append("-y") if self.task_regex: cmd.extend(["-t", str(self.task_regex)]) else: cmd.extend(["dags", "backfill", "--donot-pickle"]) if self.dry_run: cmd.append("--dry-run") if self.task_regex: cmd.extend(["-t", str(self.task_regex)]) cmd.extend( ["-s", str(self.start_date), "-e", str(self.end_date), str(self.dag_name)] ) return cmd