in liminal/runners/airflow/tasks/create_cloudformation_stack.py [0:0]
def apply_task_to_dag(self):
check_cloudformation_stack_exists_task = self._add_variables_to_operator(
BranchPythonOperator(
templates_dict={'stack_name': self.stack_name},
task_id=f'is-cloudformation-{self.task_id}-running',
python_callable=self.__cloudformation_stack_running_branch,
provide_context=True,
)
)
create_cloudformation_stack_task = self._add_variables_to_operator(
CloudFormationCreateStackOperator(
task_id=f'create-cloudformation-{self.task_id}', params={**self.__reformatted_params()}
)
)
create_stack_sensor_task = self._add_variables_to_operator(
CloudFormationCreateStackSensor(
task_id=f'cloudformation-watch-{self.task_id}-create',
stack_name=self.stack_name,
)
)
stack_creation_end_task = DummyOperator(
task_id=f'creation-end-{self.task_id}', trigger_rule='all_done', dag=self.dag
)
if self.parent:
self.parent.set_downstream(check_cloudformation_stack_exists_task)
create_stack_sensor_task.set_downstream(stack_creation_end_task)
create_cloudformation_stack_task.set_downstream(create_stack_sensor_task)
check_cloudformation_stack_exists_task.set_downstream(create_cloudformation_stack_task)
check_cloudformation_stack_exists_task.set_downstream(stack_creation_end_task)
return stack_creation_end_task