rostran/providers/cloudformation/template.py (440 lines of code) (raw):
import re
import os
import json
import importlib
import typer
from ruamel.yaml import YAML
from rostran.core.exceptions import (
TemplateFormatNotSupport,
CloudFormationTransformNotSupported,
InvalidRuleSchema,
InvalidYamlTemplateTag,
)
from rostran.core.format import FileFormat
from rostran.core.template import Template, RosTemplate
from rostran.core.settings import RULES_DIR
from rostran.core.rule_manager import (
RuleClassifier,
RuleManager,
ResourceRule,
PseudoParametersRule,
FunctionRule,
)
from rostran.core.outputs import Output, Outputs
from rostran.core.resources import Resource, Resources
from rostran.core.properties import Property
from rostran.core.parameters import Parameter, Parameters
from rostran.core.metadata import MetaItem, MetaData
from rostran.core.conditions import Condition, Conditions
from rostran.core.mappings import Mapping, Mappings
import rostran.handlers.basic as basic_handler_module
import rostran.handlers.merge as merge_handler_module
RULES_DIR = os.path.join(RULES_DIR, "cloudformation")
BUILTIN_RULES = os.path.join(RULES_DIR, "builtin")
RESOURCE_RULES = os.path.join(RULES_DIR, "resource")
yaml = YAML()
yaml.preserve_quotes = True
def short_handler_for_getatt(val):
if isinstance(val, str):
return val.split(".")
return val
short_mappings = {
"!Ref": ("Ref", None),
"!Transform": ("Fn::Transform", None),
"!Sub": ("Fn::Sub", None),
"!Split": ("Fn::Split", None),
"!Select": ("Fn::Select", None),
"!Join": ("Fn::Join", None),
"!ImportValue": ("Fn::ImportValue", None),
"!GetAZs": ("Fn::GetAZs", None),
"!GetAtt": ("Fn::GetAtt", short_handler_for_getatt),
"!FindInMap": ("Fn::FindInMap", None),
"!Cidr": ("Fn::Cidr", None),
"!Base64": ("Fn::Base64", None),
}
def short_constructor(loader, node):
tag = node.tag
if tag in short_mappings:
key, handler = short_mappings[tag]
value = loader.construct_scalar(node)
if handler:
value = handler(value)
return {key: value}
raise InvalidYamlTemplateTag(reason=f"Unknown tag: {tag}")
yaml.constructor.add_constructor(None, short_constructor)
class CloudFormationTemplate(Template):
def __init__(self, source, rule_manager: RuleManager, *args, **kwargs):
super().__init__(source, *args, **kwargs)
self.rule_manager = rule_manager
self.rules = {}
@classmethod
def initialize(cls, path: str, format: FileFormat):
if format == FileFormat.Json:
with open(path) as f:
source = json.load(f)
elif format == FileFormat.Yaml:
with open(path) as f:
source = yaml.load(f)
else:
raise TemplateFormatNotSupport(path=path, format=format)
rule_manager = RuleManager.initialize(RuleClassifier.CloudFormation)
return cls(source=source, rule_manager=rule_manager)
def transform(self):
typer.secho(f"Transforming CloudFormation template to ROS template...")
if self.source.get("Transform"):
raise CloudFormationTransformNotSupported()
template = RosTemplate()
template.description = self.source.get("Description")
self._transform_parameters(template.parameters)
self._transform_conditions(template.conditions)
self._transform_mappings(template.mappings)
self._transform_resources(template.resources)
self._transform_outputs(template.outputs)
self._transform_meta_data(template.metadata)
typer.secho(
f"Transform CloudFormation template to ROS template successful.",
fg="green",
)
return template
def _transform_parameters(self, out_parameters: Parameters):
cf_params = self.source.get("Parameters")
if not cf_params:
return
association_property_rule = (
self.rule_manager.association_property_rule.association_property
)
cf_param_labels = (
self.source.get("Metadata", {})
.get("AWS::CloudFormation::Interface", {})
.pop("ParameterLabels", {})
)
for name, value in cf_params.items():
cf_param_type = value["Type"]
typer.secho(f"Transforming parameter {name}<{cf_param_type}>")
association_property = None
param_type = "String"
if cf_param_type in Parameter.TYPES:
param_type = cf_param_type
elif cf_param_type == "List<Number>":
param_type = "CommaDelimitedList"
elif cf_param_type in association_property_rule:
rule = association_property_rule[cf_param_type]
if rule.get("Ignore") or not rule.get("To"):
typer.secho(
f" Parameter type {cf_param_type!r} of {name!r} is not supported and will be ignored.",
fg="yellow",
)
else:
association_property = rule["To"]
param_type = rule.get("Type") or param_type
else:
typer.secho(
f" Parameter type {cf_param_type!r} of {name!r} is not supported and will be ignored.",
fg="yellow",
)
parameter = Parameter(
name=name,
type=param_type,
association_property=association_property,
default=value.get("Default"),
description=value.get("Description"),
constraint_description=value.get("ConstraintDescription"),
allowed_values=value.get("AllowedValues"),
allowed_pattern=value.get("AllowedPattern"),
min_length=value.get("MinLength"),
max_length=value.get("MaxLength"),
no_echo=value.get("NoEcho"),
min_value=value.get("MinValue"),
max_value=value.get("MaxValue"),
label=cf_param_labels.get(name, {}).get("default"),
)
out_parameters.add(parameter)
def _transform_conditions(self, out_conditions: Conditions):
conditions = self.source.get("Conditions", {})
for name, value in conditions.items():
value, _ = self._transform_value(value)
condition = Condition(name=name, value=value)
out_conditions.add(condition)
def _transform_mappings(self, out_mappings: Mappings):
mappings = self.source.get("Mappings", {})
for name, value in mappings.items():
value, _ = self._transform_value(value)
mapping = Mapping(name=name, value=value)
out_mappings.add(mapping)
def _transform_resources(self, out_resources: Resources):
cf_resources = self.source.get("Resources", {})
resources_to_handle = []
for resource_id, cf_resource in cf_resources.items():
cf_resource_type = cf_resource["Type"]
cf_resource_props = cf_resource.get("Properties") or {}
typer.secho(f"Transforming resource {resource_id}<{cf_resource_type}>")
# Get rule by resource type
resource_rule: ResourceRule = self.rule_manager.resource_rules.get(
cf_resource_type
)
if resource_rule is None:
typer.secho(
f" Resource type {cf_resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
continue
self.rules[resource_id] = resource_rule
resource = Resource(
resource_id=resource_id,
resource_type=resource_rule.target_resource_type,
depends_on=cf_resource.get("DependsOn"),
condition=cf_resource.get("Condition"),
deletion_policy=cf_resource.get("DeletionPolicy"),
)
if resource.type:
props = self._transform_resource_props(
cf_resource_type,
cf_resource_props,
resource_rule.properties,
resource_rule.rule_id,
)
for k, v in props.items():
p = Property.initialize(k, v)
resource.properties.add(p)
out_resources.add(resource)
else:
resource.type = cf_resource_type
for k, v in cf_resource_props.items():
p = Property.initialize(k, v)
resource.properties.add(p)
if resource_rule.handler:
resources_to_handle.append((resource, resource_rule.handler))
for resource, handler in resources_to_handle:
handler(resource, out_resources)
def _transform_resource_props(
self, resource_type, resource_props, resource_rule_props, rule_id
):
final_props = {}
for name, value in resource_props.items():
# Ignore not support prop
prop_rule = resource_rule_props.get(name)
if prop_rule is None or prop_rule.get("Ignore"):
typer.secho(
f" Resource property {name!r} of {resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
continue
# Warn if specify Warning
warn_msg = prop_rule.get("Warning")
if warn_msg:
if not warn_msg.endswith("."):
warn_msg += "."
typer.secho(" " + warn_msg, fg="yellow")
transformed_value, resolved = self._transform_value(value)
final_name = prop_rule.get("To")
prop_type = prop_rule.get("Type")
prop_schema = prop_rule.get("Schema")
if final_name is None and prop_type != "Map":
raise InvalidRuleSchema(
path=rule_id,
reason=f"{name} is invalid. The Type should be 'Map' when To is None",
)
if prop_type == "List" and prop_schema:
final_value = []
for each in transformed_value:
val = self._transform_resource_props(
resource_type,
each,
prop_schema,
rule_id,
)
final_value.append(val)
elif prop_type == "Map" and prop_schema:
if isinstance(transformed_value, list):
transformed_value = transformed_value[0]
final_value = self._transform_resource_props(
resource_type, transformed_value, prop_schema, rule_id
)
else:
final_value = transformed_value
handler_name = prop_rule.get("Handler")
if handler_name is not None:
handler_func = getattr(basic_handler_module, handler_name)
final_value = handler_func(final_value, resolved)
if final_value is not None:
if final_name is None:
assert isinstance(final_value, dict)
final_props.update(final_value)
else:
# If To is duplicated, merge it
if final_name in final_props:
merged_value = final_props[final_name]
merge_handler_name = prop_rule.get("MergeHandler")
if merge_handler_name is not None:
merge_handler_func = getattr(
merge_handler_module, merge_handler_name
)
final_value = merge_handler_func(
final_value, merged_value, resolved
)
# If To is a multi-level attribute, it needs to be converted level by level.
# For example, if To is RuleList.0.Url, it should be converted to
# final_props["RuleList"][0]["Url"] = value.
self._handle_props_and_value(
final_props, final_name.split("."), final_value, rule_id
)
return final_props
def _handle_props_and_value(self, data, names: list, value, rule_id, _name_path=""):
if not names:
return
name = names[0]
if name.isdigit():
name = int(name)
_name_path = f"{_name_path}.{name}" if _name_path else f"{name}"
if len(names) == 1:
if name == "__single_to_multi_handler__":
if value:
data.update(value)
else:
try:
data[name] = value
except IndexError:
if name != len(data):
raise InvalidRuleSchema(
path=rule_id,
reason=f"{_name_path} is invalid. The index should be {len(data)}",
)
data.append(value)
else:
default_sub_data = [] if names[1].isdigit() else {}
try:
sub_data = data[name]
except KeyError:
sub_data = data[name] = default_sub_data
except IndexError:
if name != len(data):
raise InvalidRuleSchema(
path=rule_id,
reason=f"{_name_path} is invalid. The index should be {len(data)}",
)
data.append(default_sub_data)
sub_data = default_sub_data
self._handle_props_and_value(sub_data, names[1:], value, _name_path)
def _transform_outputs(self, out_outputs: Outputs):
cf_outputs = self.source.get("Outputs", {})
for name, value in cf_outputs.items():
value, _ = self._transform_value(value)
output = Output(
name=name,
value=value.get(Output.VALUE),
description=value.get(Output.DESCRIPTION),
condition=value.get(Output.CONDITION),
)
out_outputs.add(output)
def _transform_meta_data(self, out_meta_data: MetaData):
cf_meta_data = self.source.get("Metadata", {})
meta_data_rule = self.rule_manager.meta_data_rule
for name, value in cf_meta_data.items():
typer.secho(f"Transforming metadata {name}")
rule_props = meta_data_rule.meta_data.get(name)
if not rule_props or rule_props.get("Ignore") or not rule_props.get("To"):
typer.secho(
f" Metadata {name!r} is not supported and will be ignored.",
fg="yellow",
)
else:
name = rule_props["To"]
meta_item = MetaItem(type=name, value=value)
out_meta_data.add(meta_item)
def _transform_value(self, value):
resolved = True
if isinstance(value, list):
data = []
for v in value:
v, resolved_v = self._transform_value(v)
if not resolved_v:
resolved = False
data.append(v)
return data, resolved
elif isinstance(value, dict):
data = {}
is_func = False
if len(value) == 1:
key = list(value.keys())[0]
function_rule: FunctionRule = self.rule_manager.function_rule
# transform func
if key in function_rule.function:
is_func = True
data.update(
self._transform_function(
key, value[key], function_rule.function[key]
)
)
elif re.match(r"^Fn::[\s\S]+$", key):
is_func = True
typer.secho(
f" Function {key!r} is not supported and will be ignored.",
fg="yellow",
)
data.update(value)
if not is_func:
for k, v in value.items():
val, resolved_val = self._transform_value(v)
data[k] = val
if not resolved_val:
resolved = False
else:
resolved = False
return data, resolved
elif isinstance(value, str):
pseudo_parameters_rule: PseudoParametersRule = (
self.rule_manager.pseudo_parameters_rule
)
if value in pseudo_parameters_rule.pseudo_parameters:
return (
self._transform_pseudo_parameter(
value, pseudo_parameters_rule.pseudo_parameters[value]
),
False,
)
elif re.match(r"^AWS::[\s\S]+$", value):
typer.secho(
f" Pseudo parameter {value!r} is not supported and will be ignored.",
fg="yellow",
)
resolved = False
return value, resolved
def _transform_pseudo_parameter(self, param, rule_props) -> str:
if rule_props.get("Ignore") or not rule_props.get("To"):
typer.secho(
f" Pseudo parameter {param!r} is not supported and will be ignored.",
fg="yellow",
)
return param
return rule_props["To"]
def _transform_function(self, func_name, func_value, rule_props) -> dict:
if rule_props.get("Ignore") or not rule_props.get("To"):
typer.secho(
f" Function {func_name!r} is not supported and will be ignored.",
fg="yellow",
)
return {func_name: func_value}
final_func_name = rule_props["To"]
final_func_value, _ = self._transform_value(func_value)
if final_func_name == "Fn::Sub":
if isinstance(final_func_value, str):
final_func_value = self._transform_sub_pseudo(final_func_value)
elif (
isinstance(final_func_value, list)
and len(final_func_value) >= 1
and isinstance(final_func_value[0], str)
):
final_func_value[0] = self._transform_sub_pseudo(final_func_value[0])
handler_name = rule_props.get("Handler")
if handler_name is not None:
handler_func = getattr(basic_handler_module, handler_name)
final_func_value = handler_func(final_func_value, False)
return {final_func_name: final_func_value}
def _transform_sub_pseudo(self, value):
for (
param,
rule_props,
) in self.rule_manager.pseudo_parameters_rule.pseudo_parameters.items():
param_ref = f"${{{param}}}"
if param_ref not in value:
continue
if rule_props.get("Ignore") or not rule_props.get("To"):
typer.secho(
f" Function Fn::Sub pseudo parameter {param!r} is not supported and will be ignored.",
fg="yellow",
)
continue
value = value.replace(param_ref, f'${{{rule_props["To"]}}}')
return value