in liminal/runners/airflow/tasks/create_cloudformation_stack.py [0:0]
def __cloudformation_stack_running_branch(self, stack_name):
cloudformation = CloudFormationHook().get_conn()
try:
stack_status = cloudformation.describe_stacks(StackName=stack_name)['Stacks'][0]['StackStatus']
if stack_status in ['CREATE_COMPLETE', 'DELETE_FAILED']:
print(f'Stack {stack_name} is running')
return f'creation-end-{self.task_id}'
else:
print(f'Stack {stack_name} is not running')
except Exception as e:
if 'does not exist' in str(e):
print(f'Stack {stack_name} does not exist')
return f'create-cloudformation-{self.task_id}'
else:
raise e
return f'create-cloudformation-{self.task_id}'