def poke()

in liminal/runners/airflow/operators/cloudformation.py [0:0]


    def poke(self, context):
        """
        Checks for existence of the stack in AWS CloudFormation.
        """
        cloudformation = self.get_hook().get_conn()

        self.log.info('Poking for stack %s', self.stack_name)

        try:
            stacks = cloudformation.describe_stacks(StackName=self.stack_name)['Stacks']
            stack_status = stacks[0]['StackStatus']
            if stack_status == self.complete_status:
                return True
            elif stack_status == self.in_progress_status:
                return False
            else:
                raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')
        except ClientError as e:
            if 'does not exist' in str(e):
                if not self.allow_non_existing_stack_status():
                    raise ValueError(f'Stack {self.stack_name} does not exist')
                else:
                    return True
            else:
                raise e