iact3/stack.py (423 lines of code) (raw):

import asyncio import logging import uuid from datetime import datetime, timedelta from typing import Optional, List from uuid import UUID, uuid4 from Tea.exceptions import TeaException from iact3.config import TestConfig, IAC_NAME from iact3.exceptions import Iact3Exception from iact3.plugin.ros import StackPlugin from iact3.util import generate_client_token_ex from iact3.plugin.base_plugin import CredentialClient LOG = logging.getLogger(__name__) class Timer: def __init__(self, interval, callback, *args, **kwargs): self._interval = interval self._callback = callback self._args = args if args is not None else [] self._kwargs = kwargs if kwargs is not None else {} self._task = asyncio.ensure_future(self._job()) async def _job(self): while True: try: await self._callback(*self._args, **self._kwargs) except Exception as ex: LOG.error(f'an error occurred, {self._callback.__name__}, {ex}') await asyncio.sleep(self._interval) def cancel(self): self._task.cancel() class FilterableList(list): def filter(self, kwargs: Optional[dict] = None): if not kwargs: return self f_list = FilterableList() for item in self: if criteria_matches(kwargs, item): f_list.append(item) return f_list class Stacks(FilterableList): pass class Resources(FilterableList): pass class Events(FilterableList): pass SYS_TAGS = {'CreatedBy': f'{IAC_NAME}'} class Stacker: NULL_UUID = uuid.UUID(int=0) def __init__(self, project_name: str = None, tests: List[TestConfig] = None, uid: uuid.UUID = NULL_UUID, name_prefix: str = IAC_NAME, tags: dict = None, stacks: Stacks = None): self.tests = tests or [] self.project_name = project_name self.stack_name_prefix = name_prefix self.uid = uuid.uuid4() if uid == Stacker.NULL_UUID else uid self.tags = tags if tags else {} self.stacks: Stacks = stacks or Stacks() self._sys_tags = { f'{IAC_NAME}-id': self.uid.hex, f'{IAC_NAME}-project-name': self.project_name, } self._sys_tags.update(SYS_TAGS) @classmethod def from_stacks(cls, stacks: Stacks): return cls(stacks=stacks) async def create_stacks(self): if self.stacks: raise Iact3Exception('Stacker already initialised with stack objects') self.tags.update(self._sys_tags) stack_tasks = [ asyncio.create_task(Stack.create(test, self.tags, self.uid)) for test in self.tests ] self.stacks += await asyncio.gather(*stack_tasks) def status(self, **kwargs): stacks = self.stacks.filter(kwargs) result = {} for stack in stacks: status = stack.status result[StackStatus.curt(status)] = {stack.id: stack.status_reason} return result async def delete_stacks(self, **kwargs): stacks = self.stacks.filter(kwargs) stack_tasks = [ asyncio.create_task(Stack.delete(stack)) for stack in stacks ] await asyncio.gather(*stack_tasks) async def get_stacks_price(self): if self.stacks: raise Iact3Exception('Stacker already initialised with stack objects') self.tags.update(self._sys_tags) stack_tasks = [ asyncio.create_task(Stack.get_price(test, self.tags, self.uid)) for test in self.tests ] self.stacks += await asyncio.gather(*stack_tasks) async def preview_stacks_result(self): if self.stacks: raise Iact3Exception('Stacker already initialised with stack objects') self.tags.update(self._sys_tags) stack_tasks = [ asyncio.create_task(Stack.preview_stack_result(test, self.tags, self.uid)) for test in self.tests ] self.stacks += await asyncio.gather(*stack_tasks) def criteria_matches(kwargs: dict, instance): for k in kwargs: if not hasattr(instance, k): raise ValueError(f'{k} is not a valid property of {type(instance)}') for k, v in kwargs.items(): ins_v = getattr(instance, k) if isinstance(v, list): return ins_v in v return ins_v == v return True class StackStatus: COMPLETE = [ 'CREATE_COMPLETE', 'UPDATE_COMPLETE', 'DELETE_COMPLETE' ] IN_PROGRESS = [ 'CREATE_IN_PROGRESS', 'UPDATE_IN_PROGRESS', 'DELETE_IN_PROGRESS', 'CREATE_ROLLBACK_IN_PROGRESS', 'ROLLBACK_IN_PROGRESS' ] FAILED = [ 'CREATE_FAILED', 'UPDATE_FAILED', 'DELETE_FAILED', 'CREATE_ROLLBACK_FAILED', 'CREATE_ROLLBACK_COMPLETE', 'ROLLBACK_FAILED', 'ROLLBACK_COMPLETE' ] @classmethod def curt(cls, status): if status in cls.COMPLETE: return 'COMPLETE' elif status in cls.IN_PROGRESS: return 'IN_PROGRESS' elif status in cls.FAILED: return 'FAILED' else: return 'UNKNOWN' class Event: def __init__(self, event_dict: dict): self.event_id: str = event_dict['EventId'] self.stack_name: str = event_dict['StackName'] self.logical_id: str = event_dict['LogicalResourceId'] self.type: str = event_dict['ResourceType'] self.status: str = event_dict['Status'] self.timestamp: str = event_dict['CreateTime'] self.physical_id: str = event_dict.get('PhysicalResourceId') self.status_reason: str = event_dict['StatusReason'] def __str__(self): return '{} {} {}'.format(self.timestamp, self.logical_id, self.status) def __repr__(self): return '<Event object {} at {}>'.format(self.event_id, hex(id(self))) class Resource: def __init__( self, stack_id: str, resource_dict: dict, test_name: str = '', uuid: UUID = None ): uuid = uuid if uuid else uuid4() self.stack_id: str = stack_id self.test_name: str = test_name self.uuid: UUID = uuid self.logical_id: str = resource_dict['LogicalResourceId'] self.type: str = resource_dict['ResourceType'] # self.status: str = resource_dict['ResourceStatus'] self.status: str = resource_dict['Status'] self.physical_id: str = '' self.last_updated_timestamp: datetime = datetime.fromtimestamp(0) self.status_reason: str = '' if 'PhysicalResourceId' in resource_dict.keys(): self.physical_id = resource_dict['PhysicalResourceId'] if 'UpdateTime' in resource_dict.keys(): self.last_updated_timestamp = resource_dict['UpdateTime'] if 'StatusReason' in resource_dict.keys(): self.status_reason = resource_dict['StatusReason'] def __str__(self): return '<Resource {} {}>'.format(self.logical_id, self.status) class Stack: def __init__(self, region: str, stack_id: str, test_name: str = None, uuid: UUID = None, status_reason: str = None, stack_name: str = None, parameters: dict = None, credential: CredentialClient = None, template_price: dict = None, preview_result: dict = None): self.test_name: str = test_name self.uuid: UUID = uuid if uuid else uuid4() self.id: str = stack_id self.region = region self.plugin: StackPlugin = StackPlugin(region_id=region, credential=credential) self.name = stack_name self.parameters = parameters self.completion_time: timedelta = timedelta(0) self._status: str = '' self.status_reason: str = status_reason or '' self._launch_succeeded: bool = False self.auto_refresh_interval: timedelta = timedelta(seconds=60) self._last_event_refresh: datetime = datetime.fromtimestamp(0) self._last_resource_refresh: datetime = datetime.fromtimestamp(0) self.timer = Timer(self.auto_refresh_interval.total_seconds(), self.refresh) self.template_price = template_price self.preview_result = preview_result def __str__(self): return self.id def __repr__(self): return '<Stack object {} at {}>'.format(self.test_name, hex(id(self))) def _auto_refresh(self, last_refresh): if datetime.now() - last_refresh > self.auto_refresh_interval: return True return False @property def status(self): return self._status @status.setter def status(self, status): _complete = StackStatus.COMPLETE.copy() del _complete[_complete.index('DELETE_COMPLETE')] self._status = status if status in StackStatus.FAILED: self._launch_succeeded = False return if status in _complete: self._launch_succeeded = True return @property def launch_succeeded(self): return self._launch_succeeded @classmethod def from_stack_response(cls, stack: dict, credential: CredentialClient = None): return cls( region=stack['RegionId'], stack_id=stack['StackId'], test_name=stack.get('TestName'), uuid=stack.get('TestId'), stack_name=stack.get('StackName'), credential=credential ) @classmethod async def create(cls, test: TestConfig, tags: dict = None, uuid: UUID = None) -> 'Stack': parameters = test.parameters template_args = test.template_config.to_dict() name = test.test_name if not tags: tags = {} tags.update({f'{IAC_NAME}-test-name': name}) region = test.region credential = test.auth.credential plugin = StackPlugin(region_id=test.region, credential=credential) client_token = generate_client_token_ex(uuid.hex, name) stack_name = f'{IAC_NAME}-{name}-{region}-{uuid4().hex[:8]}' config_error = test.error if config_error: stack = cls(region, None, name, uuid, status_reason=getattr(config_error, 'message', 'Unknown error'), stack_name=stack_name, credential=credential) stack.status = getattr(config_error, 'code', 'Unknown error') stack._launch_succeeded = False stack.timer.cancel() return stack try: stack_id = await plugin.create_stack( stack_name=stack_name, parameters=parameters, timeout_in_minutes=60, client_token=client_token, tags=tags, **template_args, disable_rollback=True ) except TeaException as ex: stack_id = None stack = cls(region, stack_id, name, uuid, status_reason=ex.message, stack_name=stack_name, parameters=parameters, credential=credential) stack.status = ex.code stack._launch_succeeded = False stack.timer.cancel() return stack stack = cls(region, stack_id, name, uuid, stack_name=stack_name, parameters=parameters, credential=credential) await stack.refresh() return stack @classmethod async def get_price(cls, test: TestConfig, tags: dict = None, uuid: UUID = None): parameters = test.parameters template_args = test.template_config.to_dict() name = test.test_name if not tags: tags = {} tags.update({f'{IAC_NAME}-test-name': name}) region = test.region credential = test.auth.credential plugin = StackPlugin(region_id=test.region, credential=credential) stack_name = f'{IAC_NAME}-{name}-{region}-{uuid4().hex[:8]}' config_error = test.error if config_error: stack = cls(region, None, name, uuid, status_reason=getattr(config_error, 'message', 'Unknown error'), stack_name=stack_name, credential=credential) stack.status = getattr(config_error, 'code', 'Unknown error') stack._launch_succeeded = False stack.timer.cancel() return stack try: template_price = await plugin.get_template_estimate_cost( parameters=parameters, **template_args, region_id=region ) except TeaException as ex: stack_id = None stack = cls(region, stack_id, name, uuid, status_reason=ex.message, stack_name=stack_name, parameters=parameters, credential=credential) stack.status = ex.code stack._launch_succeeded = False stack.timer.cancel() return stack stack_id = None stack = cls(region, stack_id, name, uuid, stack_name=stack_name, parameters=parameters, credential=credential, template_price=template_price) return stack @classmethod async def preview_stack_result(cls, test: TestConfig, tags: dict = None, uuid: UUID = None): parameters = test.parameters template_args = test.template_config.to_dict() name = test.test_name if not tags: tags = {} tags.update({f'{IAC_NAME}-test-name': name}) region = test.region credential = test.auth.credential plugin = StackPlugin(region_id=test.region, credential=credential) stack_name = f'{IAC_NAME}-{name}-{region}-{uuid4().hex[:8]}' config_error = test.error if config_error: stack = cls(region, None, name, uuid, status_reason=getattr(config_error, 'message', 'Unknown error'), stack_name=stack_name, credential=credential) stack.status = getattr(config_error, 'code', 'Unknown error') stack._launch_succeeded = False stack.timer.cancel() return stack try: preview_result = await plugin.preview_stack( parameters=parameters, **template_args, region_id=region, stack_name=stack_name ) except TeaException as ex: stack_id = None stack = cls(region, stack_id, name, uuid, status_reason=ex.message, stack_name=stack_name, parameters=parameters, credential=credential) stack.status = ex.code stack._launch_succeeded = False stack.timer.cancel() return stack stack_id = None stack = cls(region, stack_id, name, uuid, stack_name=stack_name, parameters=parameters, credential=credential, preview_result=preview_result) return stack async def refresh(self, properties: bool = True, events: bool = False, resources: bool = False) -> None: if properties: await self.set_stack_properties() if events: await self._fetch_stack_events() self._last_event_refresh = datetime.now() if resources: await self._fetch_stack_resources() self._last_resource_refresh = datetime.now() async def set_stack_properties(self, stack_properties: Optional[dict] = None) -> None: props: dict = stack_properties if stack_properties else {} if not props: if self.id: props = await self.plugin.get_stack(self.id) or {} self.status = props.get('Status') self.status_reason = props.get('StatusReason') if self.status not in StackStatus.IN_PROGRESS: self.timer.cancel() async def events(self, refresh: bool = False) -> Events: if refresh or not self._events or self._auto_refresh(self._last_event_refresh): await self._fetch_stack_events() return self._events async def _fetch_stack_events(self) -> None: self._last_event_refresh = datetime.now() events = Events() stack_events = await self.plugin.list_stack_events(self.id) for event in stack_events: events.append(Event(event)) self._events = events async def resources(self, refresh: bool = False) -> Resources: if ( refresh or not self._resources or self._auto_refresh(self._last_resource_refresh) ): await self._fetch_stack_resources() return self._resources async def _fetch_stack_resources(self) -> None: self._last_resource_refresh = datetime.now() resources = Resources() stack_resources = await self.plugin.list_stack_resources(self.id) for res in stack_resources: resources.append(Resource(self.id,res,self.test_name,self.uuid)) self._resources = resources @staticmethod async def delete(stack) -> None: stack_id = stack.id if not stack_id: return await stack.plugin.delete_stack(stack_id=stack_id) LOG.info(f'Deleting stack: {stack_id}') await stack.refresh() stack.timer = Timer(stack.auto_refresh_interval.total_seconds(), stack.refresh) def error_events(self, refresh=False) -> Events: errors = Events() stacks = Stacks([self]) for stack in stacks: for status in StackStatus.FAILED: errors += stack.events(refresh=refresh).filter({'status': status}) return errors