def __cloudformation_stack_running_branch()

in liminal/runners/airflow/tasks/create_cloudformation_stack.py [0:0]


    def __cloudformation_stack_running_branch(self, stack_name):
        cloudformation = CloudFormationHook().get_conn()
        try:
            stack_status = cloudformation.describe_stacks(StackName=stack_name)['Stacks'][0]['StackStatus']
            if stack_status in ['CREATE_COMPLETE', 'DELETE_FAILED']:
                print(f'Stack {stack_name} is running')
                return f'creation-end-{self.task_id}'
            else:
                print(f'Stack {stack_name} is not running')
        except Exception as e:
            if 'does not exist' in str(e):
                print(f'Stack {stack_name} does not exist')
                return f'create-cloudformation-{self.task_id}'
            else:
                raise e

        return f'create-cloudformation-{self.task_id}'