in dags/backfill.py [0:0]
def backfill_dag():
param_validation_task = PythonOperator(
task_id="param_validation",
python_callable=param_validation,
op_kwargs={"params": "{{ dag_run.conf }}"},
)
dry_run_branch_task = BranchPythonOperator(
task_id="dry_run_parameter",
python_callable=dry_run_branch_callable,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
dry_run_task = EmptyOperator(task_id=TaskId.dry_run.value)
real_deal_task = EmptyOperator(task_id=TaskId.real_deal.value)
clear_branch_task = BranchPythonOperator(
task_id="clear_parameter",
python_callable=clear_branch_callable,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
clear_tasks_task = EmptyOperator(task_id=TaskId.clear_tasks.value)
do_not_clear_tasks_task = EmptyOperator(task_id=TaskId.do_not_clear_tasks.value)
generate_backfill_command_task = PythonOperator(
task_id="generate_backfill_command",
python_callable=generate_bash_command,
op_kwargs={"params": "{{ dag_run.conf }}"},
trigger_rule=TriggerRule.ONE_SUCCESS,
)
backfill_task = BashOperator(
task_id="execute_backfill",
bash_command="{{ ti.xcom_pull(task_ids='generate_backfill_command') }}",
)
(
param_validation_task
>> dry_run_branch_task
>> [dry_run_task, real_deal_task]
>> clear_branch_task
>> [clear_tasks_task, do_not_clear_tasks_task]
>> generate_backfill_command_task
>> backfill_task
)