in liminal/runners/airflow/operators/cloudformation.py [0:0]
def poke(self, context):
"""
Checks for existence of the stack in AWS CloudFormation.
"""
cloudformation = self.get_hook().get_conn()
self.log.info('Poking for stack %s', self.stack_name)
try:
stacks = cloudformation.describe_stacks(StackName=self.stack_name)['Stacks']
stack_status = stacks[0]['StackStatus']
if stack_status == self.complete_status:
return True
elif stack_status == self.in_progress_status:
return False
else:
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')
except ClientError as e:
if 'does not exist' in str(e):
if not self.allow_non_existing_stack_status():
raise ValueError(f'Stack {self.stack_name} does not exist')
else:
return True
else:
raise e