dags/backfill.py (106 lines of code) (raw):
import datetime
from enum import Enum
from airflow.decorators import dag
from airflow.models import DagModel
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from utils.backfill import BackfillParams
from utils.tags import Tag
class TaskId(Enum):
dry_run = "dry_run"
real_deal = "real_deal"
clear_tasks = "clear_tasks"
do_not_clear_tasks = "do_not_clear_tasks"
def dry_run_branch_callable(params: dict) -> str:
backfill_params = BackfillParams(**params)
return TaskId.dry_run.value if backfill_params.dry_run else TaskId.real_deal.value
def clear_branch_callable(params: dict) -> str:
backfill_params = BackfillParams(**params)
return (
TaskId.clear_tasks.value
if backfill_params.clear
else TaskId.do_not_clear_tasks.value
)
def param_validation(params: dict) -> bool:
backfill_params = BackfillParams(**params)
backfill_params.validate_date_range()
validate_dag_exists(dag_name=backfill_params.dag_name)
backfill_params.validate_regex_pattern()
return True
def validate_dag_exists(dag_name: str) -> None:
dag_instance = DagModel.get_dagmodel(dag_name)
if dag_instance is None:
raise ValueError(f"`dag_name`={dag_name} does not exist")
def generate_bash_command(params: dict) -> str:
backfill_params = BackfillParams(**params)
return " ".join(backfill_params.generate_backfill_command())
doc_md = """
# Backfill DAG
#### Use with caution
#### Some tips/notes:
* Always use dry run first. Especially when using task regex
* Date formats are 2020-03-01 or 2020-03-01T00:00:00
* Dry run for clearing tasks will show you the list of tasks that will be cleared
* Dry run for backfilling will not show the list, but is useful in testing for input errors
"""
@dag(
dag_id="backfill",
schedule_interval=None,
doc_md=doc_md,
catchup=False,
start_date=datetime.datetime(2022, 11, 1),
dagrun_timeout=datetime.timedelta(days=1),
tags=[Tag.ImpactTier.tier_3, Tag.Triage.record_only],
render_template_as_native_obj=True,
params={
"dag_name": Param("dag_name", type="string"),
"start_date": Param(
(datetime.datetime.today() - datetime.timedelta(days=10)).isoformat(),
type="string",
format="date-time",
),
"end_date": Param(
datetime.datetime.today().isoformat(), type="string", format="date-time"
),
"clear": Param(False, type="boolean"),
"dry_run": Param(True, type="boolean"),
"task_regex": Param(None, type=["string", "null"]),
},
)
def backfill_dag():
param_validation_task = PythonOperator(
task_id="param_validation",
python_callable=param_validation,
op_kwargs={"params": "{{ dag_run.conf }}"},
)
dry_run_branch_task = BranchPythonOperator(
task_id="dry_run_parameter",
python_callable=dry_run_branch_callable,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
dry_run_task = EmptyOperator(task_id=TaskId.dry_run.value)
real_deal_task = EmptyOperator(task_id=TaskId.real_deal.value)
clear_branch_task = BranchPythonOperator(
task_id="clear_parameter",
python_callable=clear_branch_callable,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
clear_tasks_task = EmptyOperator(task_id=TaskId.clear_tasks.value)
do_not_clear_tasks_task = EmptyOperator(task_id=TaskId.do_not_clear_tasks.value)
generate_backfill_command_task = PythonOperator(
task_id="generate_backfill_command",
python_callable=generate_bash_command,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
backfill_task = BashOperator(
task_id="execute_backfill",
bash_command="{{ ti.xcom_pull(task_ids='generate_backfill_command') }}",
)
(
param_validation_task
>> dry_run_branch_task
>> [dry_run_task, real_deal_task]
>> clear_branch_task
>> [clear_tasks_task, do_not_clear_tasks_task]
>> generate_backfill_command_task
>> backfill_task
)
dag = backfill_dag()