iact3/generate_params.py (369 lines of code) (raw):
import json
import logging
import random
import re
import string
import uuid
from urllib.parse import urlparse, parse_qs
from urllib.request import urlopen
import requests
from iact3.util import yaml, CustomSafeLoader
from iact3.exceptions import Iact3Exception
from iact3.plugin.ecs import EcsPlugin
from iact3.plugin.oss import OssPlugin
from iact3.plugin.ros import StackPlugin
from iact3.plugin.vpc import VpcPlugin
LOG = logging.getLogger(__name__)
IAC_NAME = 'iact3'
IAC_PACKAGE_NAME = 'alibabacloud-ros-iact3'
class Selector:
def __init__(self, key: str, original_value: any,
allowed_values: list = None,
parameters: dict = None):
self.key = key
self.original_value = original_value
self.allowed_values = allowed_values or []
self.parameters = parameters
self.current_value = allowed_values[0] if allowed_values else None
self.next = None
self.prev = None
def refresh_parameters(self):
self.parameters[self.key] = self.current_value
class LinkedList:
def __init__(self):
self._head = None
self._last = None
def is_empty(self):
return self._head is None
def append(self, key, original_value, allowed_values=None, parameters=None):
node = Selector(key, original_value, allowed_values, parameters)
if self.is_empty():
self._head = node
else:
cur = self._last
cur.next = node
node.prev = cur
self._last = node
def first(self):
return self._head
def remove(self, key):
if self.is_empty():
return
cur = self.first()
if cur.key == key:
cur = cur.next
if cur is None:
self._head = None
return
cur.prev = None
self._head = cur
return
while cur is not None:
if cur.key == key:
if cur.prev is not None:
cur.prev.next = cur.next
if cur.next is not None:
cur.next.prev = cur.prev
break
cur = cur.next
def __iter__(self):
cur = self._head
while cur is not None:
value = cur.key
cur = cur.next
yield value
def _error_message(key, value, msg):
return f'Parsing pseudo parameter (Key: {key}, Value: {value}) error, {msg}'
class ResolvedParameters:
def __init__(self, name: str, region: str, parameters: dict, error=None):
self.name = name
self.region = region
self.parameters = parameters
self.error = error
class ParamGenerator:
RE_V_AUTO = re.compile(fr'\$\[{IAC_NAME}-auto]', re.I)
RE_V_CURRENT_REGION = re.compile(fr'\$\[{IAC_NAME}-current[-_]region]', re.I)
RE_K_ZONE_ID = re.compile(r'(\w*)zone(_|)id(_|)(\d*)', re.I)
RE_K_VPC_ID = re.compile(r'(\w*)vpc(_|)id(_|)(\d*)', re.I)
RE_K_VSW_ID = re.compile(r'(\w*)v(_|)switch(_|)id(_|)(\d*)', re.I)
RE_K_SECURITY_GROUP = re.compile(r'(\w*)security(_|)group(_id|id)(_|)(\d*)', re.I)
RE_K_COMMON_NAME = re.compile(r'(\w*)name(_|)(\d*)', re.I)
RE_K_PASSWORD = re.compile(r'(\w*)password(_|)(\d*)', re.I)
RE_K_UUID = re.compile(r'(\w*)uuid(_|)(\d*)', re.I)
def __init__(self, config):
self.config = config
self.region = config.region
self.parameters = config.parameters
self.template_config = config.template_config
self.parameters_order = config.parameters_order
self.credential = config.auth.credential
self.plugin = StackPlugin(region_id=self.region, credential=self.credential)
self._vpc_id = None
self._vsw_id = None
self._not_support_keys = None
self._linked_list: LinkedList = LinkedList()
self._unresolved_parameters = {}
@classmethod
async def result(cls, config) -> ResolvedParameters:
pg = cls(config)
LOG.debug(f'start to generate parameters for {config.test_name}')
try:
await pg.resolve_auto_value()
LOG.debug(f'resolve auto value result: {pg.parameters}')
await pg.resolve_auto_key()
resolved_parameters = ResolvedParameters(config.test_name, config.region, pg.parameters)
LOG.debug(
f'success generate parameters for {config.test_name}, parameters {resolved_parameters.parameters}')
except Exception as ex:
resolved_parameters = ResolvedParameters(config.test_name, config.region, pg.parameters, error=ex)
LOG.debug(f'failed generate parameters for {config.test_name}, {ex}', exc_info=True)
return resolved_parameters
async def resolve_auto_key(self):
for key, unresolved_value in self._unresolved_parameters.items():
if not isinstance(unresolved_value, str) or not self.RE_V_AUTO.fullmatch(unresolved_value):
continue
if self.RE_K_VSW_ID.fullmatch(key):
if self._vsw_id is None:
await self._gen_vpc_vsw_id(key, unresolved_value)
self.parameters[key] = re.sub(self.RE_V_AUTO, self._vsw_id, unresolved_value)
elif self.RE_K_VPC_ID.fullmatch(key):
if self._vpc_id is None:
await self._gen_vpc_vsw_id(key, unresolved_value)
self.parameters[key] = re.sub(self.RE_V_AUTO, self._vpc_id, unresolved_value)
elif self.RE_K_COMMON_NAME.fullmatch(key):
value = self._gen_common_name()
self.parameters[key] = re.sub(self.RE_V_AUTO, value, unresolved_value)
elif self.RE_K_PASSWORD.fullmatch(key):
value = self._gen_password()
self.parameters[key] = re.sub(self.RE_V_AUTO, value, unresolved_value)
elif self.RE_K_UUID.fullmatch(key):
value = self._gen_uuid()
self.parameters[key] = re.sub(self.RE_V_AUTO, value, unresolved_value)
elif self.RE_K_SECURITY_GROUP.fullmatch(key):
if self._vpc_id is None:
self._vpc_id, self._vsw_id = await self._gen_vpc_vsw_id(key, unresolved_value)
value = await self._gen_sg(key, unresolved_value)
self.parameters[key] = re.sub(self.RE_V_AUTO, value, unresolved_value)
return self.parameters
async def resolve_auto_value(self):
linked_list = LinkedList()
resolved_parameters = {}
parameters_order = self.parameters_order
if not parameters_order:
parameters_order = await self._get_parameters_order() or self.parameters.keys()
for key in parameters_order:
if key not in self.parameters:
continue
original_value = self.parameters[key]
if not isinstance(original_value, str):
resolved_parameters[key] = original_value
continue
if self.RE_V_AUTO.fullmatch(original_value):
resolved_parameters[key] = None
linked_list.append(key, original_value, parameters=resolved_parameters)
elif self.RE_V_CURRENT_REGION.fullmatch(original_value):
resolved_parameters[key] = self.region
else:
resolved_parameters[key] = original_value
self._linked_list = linked_list
first_selector = linked_list.first()
if not first_selector:
return resolved_parameters
resolved_parameters = await self._select_value(first_selector)
self.parameters.update(resolved_parameters)
self.parameters.update(self._unresolved_parameters)
return self.parameters
async def _get_constraints(self, **kwargs):
for _ in range(3):
constraints = await self.plugin.get_parameter_constraints(**kwargs)
behavior = constraints[0].get('Behavior')
values = constraints[0].get('AllowedValues')
reason = constraints[0].get('BehaviorReason')
if behavior == 'QueryError' and reason and 'timeout' in reason:
LOG.debug(f'get constraints timeout, {constraints}')
continue
if behavior == 'NotSupport':
return
return values
else:
return 'timeout'
async def _select_value(self, selector: Selector, error_message=None) -> dict:
key = selector.key
parameters = selector.parameters
allowed_values = selector.allowed_values
current_value = selector.current_value
error_msg = f'can not find any available value for {key} in {self.region} region ' \
f'in {allowed_values} for {self.config.test_name}'
if allowed_values:
next_selector = selector.next
if not next_selector:
return selector.parameters
index = allowed_values.index(current_value)
if index + 1 >= len(allowed_values):
prev_selector = selector.prev
if not prev_selector:
raise Iact3Exception(error_message or error_msg)
return await self._select_value(prev_selector, error_message=error_message)
selector.current_value = allowed_values[index + 1]
next_selector.parameters[key] = selector.current_value
next_selector.allowed_values = []
return await self._select_value(next_selector, error_message=error_message)
values = await self._get_constraints(
parameters=parameters,
**self.template_config.to_dict(),
parameters_key_filter=[key],
parameters_order=self.parameters_order
)
if values is None:
next_selector = selector.next
self._unresolved_parameters[key] = selector.original_value
self._linked_list.remove(key)
if not next_selector:
return selector.parameters
return await self._select_value(next_selector, error_message=error_message)
elif values == 'timeout':
msg = f'get constraints timeout for {key} in {self.region} region for {self.config.test_name}'
raise Iact3Exception(msg)
elif not values:
prev_selector = selector.prev
if not prev_selector:
param = json.dumps({k: v for k, v in parameters.items() if v is not None})
msg = (f'no available value found for {key} '
f'based on parameter {param} in {self.region} for {self.config.test_name}')
raise Iact3Exception(msg)
error_msg = f'no available value found for {key} in {self.region} region for {self.config.test_name}'
return await self._select_value(prev_selector, error_message=error_msg)
selector.allowed_values = values
selector.current_value = values[0]
selector.parameters[key] = selector.current_value
next_selector = selector.next
if not next_selector:
return selector.parameters
next_selector.parameters[key] = selector.current_value
return await self._select_value(next_selector, error_message=error_message)
async def _get_parameters_order(self):
template = await self._get_template_body()
if not template:
raise Iact3Exception(f'failed to retrieve template by template config {self.template_config}')
parsed_tpl = yaml.load(template, Loader=CustomSafeLoader)
param_groups = parsed_tpl.get('Metadata', {}).get('ALIYUN::ROS::Interface', {}).get('ParameterGroups', [])
if not param_groups:
return
params_in_metadata = []
for param_group in param_groups:
params = param_group.get('Parameters', []) if param_group else None
if not params:
continue
standard_params = [p for p in params if isinstance(p, str)]
params_in_metadata += standard_params
for key in self.parameters:
if key not in params_in_metadata:
params_in_metadata.append(key)
self.parameters_order = params_in_metadata
return params_in_metadata
template_max_size = 524288
async def _get_template_body(self):
template_body = self.template_config.template_body
if template_body:
return template_body
template_id = self.template_config.template_id
if template_id:
try:
template_info = await self.plugin.get_template(
template_id=template_id,
template_version=self.template_config.template_version
)
return template_info['TemplateBody']
except Exception as ex:
raise Iact3Exception(f'Failed to retrieve {template_id}: {ex}')
template_url = self.template_config.template_url
if template_url:
components = urlparse(template_url)
if components.scheme == 'oss':
return self._get_template_from_oss(template_url, components)
elif components.scheme == 'file':
try:
return urlopen(template_url).read()
except Exception as ex:
raise Iact3Exception(f'Failed to retrieve {template_url}: {ex}')
else:
try:
resp = requests.get(template_url, timeout=10, stream=True)
resp.raise_for_status()
reader = resp.iter_content(chunk_size=1000)
result = b''
max_size = self.template_max_size
for chunk in reader:
result += chunk
if len(result) > max_size:
raise Iact3Exception(f'template from {template_url}'
f'exceeds maximum allowed size ({max_size} bytes)')
return result
except Exception as ex:
raise Iact3Exception(f'Failed to retrieve {template_url}: {ex}')
def _get_template_from_oss(self, template_url, components):
bucket_name = components.netloc
object_path = components.path.strip('/')
if not bucket_name or not object_path:
raise Iact3Exception(f'Invalid oss url {template_url}')
region_id = self.region
if components.query:
t = parse_qs(components.query)
region_ids = t.get('RegionId')
region_id = region_ids[0] if region_ids else self.region
oss_plugin = OssPlugin(region_id=region_id, bucket_name=bucket_name, credential=self.credential)
try:
object_meta = oss_plugin.get_object_meta(object_path)
except Exception as ex:
raise Iact3Exception(f'Oss failed: {ex}')
if object_meta is None:
raise Iact3Exception(f'Invalid oss url {template_url}')
if object_meta.content_length > self.template_max_size:
raise Iact3Exception(f'template from {template_url} exceeds maximum allowed size (524288 bytes)')
try:
r = oss_plugin.get_object_content(object_path)
except Exception as ex:
raise Iact3Exception(f'Oss failed: {ex}')
if r is None:
raise Iact3Exception(f'Invalid oss url {template_url}')
return r.read()
async def _gen_vpc_id(self, key, value):
plugin = VpcPlugin(self.region, credential=self.credential)
vpc = await plugin.get_one_vpc()
if not vpc:
msg = f'can not find any vpc in region {self.region}'
raise Iact3Exception(_error_message(key, value, msg))
return vpc['VpcId']
async def _gen_vpc_vsw_id(self, key, value):
zone_id = None
for name, value in self.parameters.items():
if self.RE_K_ZONE_ID.fullmatch(name):
zone_id = value
break
plugin = VpcPlugin(self.region, credential=self.credential)
vsw = await plugin.get_one_vswitch(zone_id=zone_id)
if not vsw:
msg = f'can not find any vswitch in zone {zone_id}'
raise Iact3Exception(_error_message(key, value, msg))
self._vpc_id = vsw['VpcId']
self._vsw_id = vsw['VSwitchId']
return self._vpc_id, self._vsw_id
def _gen_common_name(self):
return f'{IAC_NAME}-{uuid.uuid1().hex}'[:50]
def _gen_password(self):
special_chars = '!#$&{*:[=,]-_%@+'
password_chars = []
for item in (string.ascii_lowercase, special_chars, string.digits, string.ascii_uppercase):
password_chars.extend(random.sample(item, 4))
return ''.join(password_chars)
def _gen_uuid(self):
return str(uuid.uuid1())
async def _gen_sg(self, key, value):
if self._vpc_id is None:
await self._gen_vpc_vsw_id(key, value)
plugin = EcsPlugin(region_id=self.region, credential=self.credential)
sg = await plugin.get_security_group(vpc_id=self._vpc_id)
if not sg:
msg = f'can not find security group in vpc {self._vpc_id} in {self.region} region'
raise Iact3Exception(_error_message(key, value, msg))
return sg['SecurityGroupId']