def apply_task_to_dag()

in liminal/runners/airflow/tasks/create_cloudformation_stack.py [0:0]


    def apply_task_to_dag(self):
        check_cloudformation_stack_exists_task = self._add_variables_to_operator(
            BranchPythonOperator(
                templates_dict={'stack_name': self.stack_name},
                task_id=f'is-cloudformation-{self.task_id}-running',
                python_callable=self.__cloudformation_stack_running_branch,
                provide_context=True,
            )
        )

        create_cloudformation_stack_task = self._add_variables_to_operator(
            CloudFormationCreateStackOperator(
                task_id=f'create-cloudformation-{self.task_id}', params={**self.__reformatted_params()}
            )
        )

        create_stack_sensor_task = self._add_variables_to_operator(
            CloudFormationCreateStackSensor(
                task_id=f'cloudformation-watch-{self.task_id}-create',
                stack_name=self.stack_name,
            )
        )

        stack_creation_end_task = DummyOperator(
            task_id=f'creation-end-{self.task_id}', trigger_rule='all_done', dag=self.dag
        )

        if self.parent:
            self.parent.set_downstream(check_cloudformation_stack_exists_task)

        create_stack_sensor_task.set_downstream(stack_creation_end_task)
        create_cloudformation_stack_task.set_downstream(create_stack_sensor_task)
        check_cloudformation_stack_exists_task.set_downstream(create_cloudformation_stack_task)
        check_cloudformation_stack_exists_task.set_downstream(stack_creation_end_task)

        return stack_creation_end_task