rostran/core/properties.py (206 lines of code) (raw):
import re
from typing import Any
from openpyxl.cell.cell import Cell
from .exceptions import (
InvalidTemplateProperty,
InvalidExpression,
ConflictDataTypeInExpression,
InvalidIndexInExpression,
DiscontinuousIndexInExpression,
InvalidTemplateProperties,
)
from .utils import get_and_validate_cell, sorted_data
NORMAL_PATTERN = re.compile(r"^([\w\-]+)$")
LIST_PATTERN = re.compile(r"^([\w\-]+)(\[\d+\])+$")
LIST_INDEX_PATTERN = re.compile(r"\[(\d+)\]")
REF_PATTERN = re.compile(r"^!Ref ([\w\-]+)$")
class Property:
def __init__(self, name: str, value: Any):
self.name = name
self.value = value
@classmethod
def initialize(cls, name: str, value: dict):
prop = cls(name, value)
prop.validate()
return prop
@classmethod
def initialize_from_excel(cls, header_cell: Cell, data_cell: Cell):
prop_name = get_and_validate_cell(header_cell, InvalidTemplateProperty)
prop_value = data_cell.value
if isinstance(prop_value, str):
value = prop_value.strip()
result = REF_PATTERN.findall(value)
if result:
prop_value = {"Ref": result[0]}
return cls(name=prop_name, value=prop_value)
def validate(self):
if not isinstance(self.name, str):
raise InvalidTemplateProperty(
name=self.name,
reason=f"The type should be str",
)
def as_dict(self, format=False):
return {self.name: self.value}
def get_depends_on_set(self) -> set:
depends_on_set = set()
def resolve_depends_on(val):
if isinstance(val, dict):
if len(val) == 1:
ref_value = val.get("Ref")
if ref_value and isinstance(ref_value, str):
depends_on_set.add(ref_value)
get_value = val.get("Fn::GetAtt")
if (
get_value
and isinstance(get_value, list)
and len(get_value) >= 2
):
name = get_value[0]
if isinstance(name, str):
depends_on_set.add(name)
else:
for v in val.values():
resolve_depends_on(v)
elif isinstance(val, list):
for v in val:
resolve_depends_on(v)
resolve_depends_on(self.value)
return depends_on_set
class Properties(dict):
PROPERTIES_KEY_SCORES = {
"ZoneId": 0,
"MasterZoneIds": 1,
"WorkerZoneIds": 2,
"SlaveZoneIds": 3,
"VpcId": 4,
"VPCId": 5,
"VSwitchId": 6,
"VswitchId": 7,
"VSwitchIds": 8,
"MasterVSwitchIds": 9,
"WorkerVSwitchIds": 10,
"SourceVSwitchIds": 11,
"PodVswitchIds": 12,
"SecurityGroupId": 13,
"SecurityGroupIds": 14,
"NetworkInterfaceId": 15,
"PrimaryNetworkInterfaceId": 16,
"InstanceId": 17,
"InstanceIds": 18,
"ImageId": 19,
}
@classmethod
def initialize(cls, data: dict):
if not isinstance(data, dict):
raise InvalidTemplateProperties(
reason=f"The type of data ({data}) should be dict"
)
props = cls()
for name, value in data.items():
prop = Property.initialize(name, value)
props.add(prop)
return props
def add(self, prop: Property):
self[prop.name] = prop
def resolve(self) -> "Properties":
props = Properties()
for old_name, old_prop in self.items():
name_parts = old_name.split(".")
if len(name_parts) == 1:
props[old_name] = Property(old_prop.name, old_prop.value)
continue
prop_name = None
cur_value = prop_value = None
length = len(name_parts)
for i, name_part in enumerate(name_parts):
result = NORMAL_PATTERN.findall(name_part)
if result:
cur_name = name_part
if i == 0:
prop_name = name_part
cur_value = prop_value = (
props[prop_name].value if prop_name in props else {}
)
elif i < length - 1:
if not isinstance(cur_value, dict):
raise ConflictDataTypeInExpression(expression=old_name)
if cur_name not in cur_value:
cur_value[cur_name] = {}
cur_value = cur_value[cur_name]
else:
if not isinstance(cur_value, dict):
raise ConflictDataTypeInExpression(expression=old_name)
cur_value[cur_name] = old_prop.value
continue
result = LIST_PATTERN.findall(name_part)
if result:
cur_name = result[0][0]
if i == 0:
prop_name = cur_name
cur_value = prop_value = (
props[prop_name].value if prop_name in props else []
)
cur_value = _handle_list_value(
name_part, cur_value, expression=old_name
)
elif i < length - 1:
if not isinstance(cur_value, dict):
raise ConflictDataTypeInExpression(expression=old_name)
if cur_name not in cur_value:
cur_value[cur_name] = []
cur_value = cur_value[cur_name]
cur_value = _handle_list_value(
name_part, cur_value, expression=old_name
)
else:
if not isinstance(cur_value, dict):
raise ConflictDataTypeInExpression(expression=old_name)
if cur_name not in cur_value:
cur_value[cur_name] = []
cur_value = cur_value[cur_name]
cur_value = _handle_list_value(
name_part,
cur_value,
expression=old_name,
data=old_prop.value,
)
continue
raise InvalidExpression(expression=old_name)
props[prop_name] = Property(prop_name, prop_value)
return props
def as_dict(self, format=False) -> dict:
data = {}
keys = self.keys()
if format:
keys = sorted_data(keys, scores=self.PROPERTIES_KEY_SCORES)
for key in keys:
prop: Property = self[key]
if prop.value is not None:
data.update(prop.as_dict(format))
return data
def get_depends_on_set(self) -> set:
depends_on_set = set()
for prop in self.values():
prop: Property
depends_on_set.update(prop.get_depends_on_set())
return depends_on_set
def _handle_list_value(name_part, cur_value, expression, data=None):
if not isinstance(cur_value, list):
raise ConflictDataTypeInExpression(expression=expression)
indexes = LIST_INDEX_PATTERN.findall(name_part)
indexes_length = len(indexes)
for j, index in enumerate(indexes):
index = int(index)
if index < 0:
raise InvalidIndexInExpression(index=index, expression=expression)
if index > len(cur_value):
raise DiscontinuousIndexInExpression(index=index, expression=expression)
elif index == len(cur_value):
if j < indexes_length - 1:
cur_value.append([])
elif data is None:
cur_value.append({})
else:
cur_value.append(data)
cur_value = cur_value[index]
return cur_value