def backfill_dag()

in dags/backfill.py [0:0]


def backfill_dag():
    param_validation_task = PythonOperator(
        task_id="param_validation",
        python_callable=param_validation,
        op_kwargs={"params": "{{ dag_run.conf }}"},
    )

    dry_run_branch_task = BranchPythonOperator(
        task_id="dry_run_parameter",
        python_callable=dry_run_branch_callable,
        op_kwargs={"params": "{{ dag_run.conf }}"},
        trigger_rule=TriggerRule.ONE_SUCCESS,
    )

    dry_run_task = EmptyOperator(task_id=TaskId.dry_run.value)
    real_deal_task = EmptyOperator(task_id=TaskId.real_deal.value)

    clear_branch_task = BranchPythonOperator(
        task_id="clear_parameter",
        python_callable=clear_branch_callable,
        op_kwargs={"params": "{{ dag_run.conf }}"},
        trigger_rule=TriggerRule.ONE_SUCCESS,
    )

    clear_tasks_task = EmptyOperator(task_id=TaskId.clear_tasks.value)
    do_not_clear_tasks_task = EmptyOperator(task_id=TaskId.do_not_clear_tasks.value)

    generate_backfill_command_task = PythonOperator(
        task_id="generate_backfill_command",
        python_callable=generate_bash_command,
        op_kwargs={"params": "{{ dag_run.conf }}"},
        trigger_rule=TriggerRule.ONE_SUCCESS,
    )

    backfill_task = BashOperator(
        task_id="execute_backfill",
        bash_command="{{ ti.xcom_pull(task_ids='generate_backfill_command') }}",
    )

    (
        param_validation_task
        >> dry_run_branch_task
        >> [dry_run_task, real_deal_task]
        >> clear_branch_task
        >> [clear_tasks_task, do_not_clear_tasks_task]
        >> generate_backfill_command_task
        >> backfill_task
    )