in liminal/runners/airflow/tasks/delete_cloudformation_stack.py [0:0]
def apply_task_to_dag(self):
check_dags_queued_task = BranchPythonOperator(
task_id=f'{self.task_id}-is-dag-queue-empty',
python_callable=self.__queued_dag_runs_exists,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=self.dag,
)
delete_stack_task = self._add_variables_to_operator(
CloudFormationDeleteStackOperator(
task_id=f'delete-cloudformation-{self.task_id}',
params={'StackName': self.stack_name},
)
)
delete_stack_sensor = self._add_variables_to_operator(
CloudFormationDeleteStackSensor(
task_id=f'cloudformation-watch-{self.task_id}-delete',
stack_name=self.stack_name,
)
)
stack_delete_end_task = DummyOperator(task_id=f'delete-end-{self.task_id}', dag=self.dag)
if self.parent:
self.parent.set_downstream(check_dags_queued_task)
check_dags_queued_task.set_downstream(stack_delete_end_task)
check_dags_queued_task.set_downstream(delete_stack_task)
delete_stack_task.set_downstream(delete_stack_sensor)
delete_stack_sensor.set_downstream(stack_delete_end_task)
return stack_delete_end_task