def apply_task_to_dag()

in liminal/runners/airflow/tasks/delete_cloudformation_stack.py [0:0]


    def apply_task_to_dag(self):
        check_dags_queued_task = BranchPythonOperator(
            task_id=f'{self.task_id}-is-dag-queue-empty',
            python_callable=self.__queued_dag_runs_exists,
            provide_context=True,
            trigger_rule=TriggerRule.ALL_DONE,
            dag=self.dag,
        )

        delete_stack_task = self._add_variables_to_operator(
            CloudFormationDeleteStackOperator(
                task_id=f'delete-cloudformation-{self.task_id}',
                params={'StackName': self.stack_name},
            )
        )

        delete_stack_sensor = self._add_variables_to_operator(
            CloudFormationDeleteStackSensor(
                task_id=f'cloudformation-watch-{self.task_id}-delete',
                stack_name=self.stack_name,
            )
        )

        stack_delete_end_task = DummyOperator(task_id=f'delete-end-{self.task_id}', dag=self.dag)

        if self.parent:
            self.parent.set_downstream(check_dags_queued_task)

        check_dags_queued_task.set_downstream(stack_delete_end_task)
        check_dags_queued_task.set_downstream(delete_stack_task)
        delete_stack_task.set_downstream(delete_stack_sensor)
        delete_stack_sensor.set_downstream(stack_delete_end_task)

        return stack_delete_end_task