rostran/providers/terraform/template.py (688 lines of code) (raw):
import os
import linecache
from uuid import uuid4
import importlib
from typing import Any
import typer
from libterraform import TerraformCommand, TerraformConfig
from libterraform.exceptions import TerraformCommandError
from rostran.core.exceptions import (
RosTranWarning,
TemplateFormatNotSupport,
RunCommandFailed,
TerraformPlanFormatVersionNotSupported,
TerraformMultiProvidersNotSupported,
TerraformProviderNotFound,
RosTranException,
InvalidRuleSchema,
)
from rostran.core.format import FileFormat
from rostran.core.rule_manager import RuleManager, RuleClassifier, ResourceRule
from rostran.core.template import Template, RosTemplate
from rostran.core.properties import Property
from rostran.core.resources import Resources, Resource
from rostran.core.outputs import Outputs, Output
import rostran.handlers.basic as basic_handler_module
import rostran.handlers.merge as merge_handler_module
class TerraformTemplate(Template):
PROVIDERS = (
ALICLOUD,
AWS,
) = (
"alicloud",
"aws",
)
SUPPORTED_PLAN_FORMAT_VERSIONS = ("1.0", "1.1", "1.2")
PLAN_PROPERTIES = (
P_PLANNED_RESOURCES,
P_FORMAT_VERSION,
P_CONFIGURATION,
P_PLANNED_VALUES,
P_RESOURCES,
P_PROVIDER_CONFIG,
P_ROOT_MODULE,
P_MODE,
P_ADDRESS,
P_VALUES,
P_VALUE,
P_OUTPUTS,
P_TYPE,
) = (
"planned_resources",
"format_version",
"configuration",
"planned_values",
"resources",
"provider_config",
"root_module",
"mode",
"address",
"values",
"value",
"outputs",
"type",
)
PLAN_MODES = (P_MANAGED,) = ("managed",)
SOURCE_PROPERTIES = (
MANAGED_RESOURCES,
DATA_RESOURCES,
OUTPUTS,
PROPERTIES,
NAME,
MODE,
TYPE,
CONFIG,
BODY,
ATTRIBUTES,
BLOCKS,
SRC_RANGE,
END_RANGE,
COUNT,
EXPR,
TRAVERSAL,
SOURCE,
COLLECTION,
KEY,
ARGS,
PARTS,
EACH,
FOR_EACH,
PROVIDER_CONFIG_REF,
PROVIDER,
NAMESPACE,
HOSTNAME,
DEPENDS_ON,
MANAGED,
CONNECTION,
PROVISIONERS,
CREATE_BEFORE_DESTROY,
PREVENT_DESTROY,
CREATE_BEFORE_DESTROY_SET,
PREVENT_DESTROY_SET,
IGNORE_CHANGED,
IGNORE_ALL_CHANGED,
DECL_RANGE,
TYPE_RANGE,
FILENAME,
START,
END,
) = (
"ManagedResources",
"DataResources",
"Outputs",
"Properties",
"Name",
"Mode",
"Type",
"Config",
"Body",
"Attributes",
"Blocks",
"SrcRange",
"EndRange",
"Count",
"Expr",
"Traversal",
"Source",
"Collection",
"Key",
"Args",
"Parts",
"Each",
"ForEach",
"ProviderConfigRef",
"Provider",
"Namespace",
"Hostname",
"DependsOn",
"Managed",
"Connection",
"Provisioners",
"CreateBeforeDestroy",
"PreventDestroy",
"CreateBeforeDestroySet",
"PreventDestroySet",
"IgnoreChanged",
"IgnoreAllChanged",
"DeclRanage",
"TypeRanage",
"Filename",
"Start",
"End",
)
RESOURCE_PROP_PROPERTIES = (
PROP_TYPE,
PROP_VALUE,
PROP_ARGS,
) = (
".Type",
".Value",
".Args",
)
RESOURCE_PROP_TYPES = (
PROP_TYPE_VALUE,
PROP_TYPE_FUNC,
) = ("Value", "Func")
RESOURCE_PROP_FUNCS = (FUNC_GET_ATT,) = ("GetAtt",)
def __init__(self, source, plan, provider, rule_manager: RuleManager):
super().__init__(source)
self.plan = plan
self.provider = provider
self.rule_manager = rule_manager
@classmethod
def initialize(
cls,
path: str,
format: FileFormat = FileFormat.Terraform,
tf_plan_path: str = None,
):
if format != FileFormat.Terraform:
raise TemplateFormatNotSupport(path=path, format=format)
tf_dir = path if os.path.isdir(path) else os.path.dirname(path)
tf_plan, tf_data = cls._get_plan_data(tf_dir, tf_plan_path)
providers = tf_plan[cls.P_CONFIGURATION][cls.P_PROVIDER_CONFIG]
if len(providers) > 1:
raise TerraformMultiProvidersNotSupported()
elif len(providers) < 1:
raise TerraformProviderNotFound()
provider = list(providers.keys())[0]
if cls.ALICLOUD == provider:
rule_manager = RuleManager.initialize(RuleClassifier.TerraformAliCloud)
else:
message = (
f"Terraform transformation for provider {provider} is not supported"
)
raise RosTranWarning(message=message)
return cls(
source=tf_data, plan=tf_plan, provider=provider, rule_manager=rule_manager
)
@classmethod
def _get_plan_data(cls, tf_dir, tf_plan_path=None) -> (dict, dict):
# Using "terraform plan/show" to parse configuration
typer.secho("Parsing terraform config...")
tf_plan = None
cwd = os.getcwd()
if not os.path.isabs(tf_dir):
tf_dir = os.path.join(cwd, tf_dir)
tf = TerraformCommand(tf_dir)
if tf_plan_path is None:
plan_filename = f"{str(uuid4())[:8]}.tfplan"
tf_plan_path = os.path.join(cwd, plan_filename)
try:
tf.init(check=True)
tf.plan(out=tf_plan_path, check=True)
r = tf.show(tf_plan_path, check=True)
tf_plan = r.value
except TerraformCommandError as e:
raise RunCommandFailed(cmd=e.cmd, reason=e.stderr or e.stdout)
finally:
if os.path.exists(tf_plan_path):
os.remove(tf_plan_path)
else:
try:
r = tf.show(tf_plan_path, check=True)
tf_plan = r.value
except TerraformCommandError as e:
raise RunCommandFailed(cmd=e.cmd, reason=e.stderr or e.stdout)
if tf_plan is None:
raise RosTranException()
version = tf_plan[cls.P_FORMAT_VERSION]
if version not in cls.SUPPORTED_PLAN_FORMAT_VERSIONS:
raise TerraformPlanFormatVersionNotSupported(version=version)
tf_data, _ = TerraformConfig.load_config_dir(tf_dir)
typer.secho("Parse terraform config done.")
return tf_plan, tf_data
def transform(self) -> RosTemplate:
typer.secho(
f"Transforming terraform (provider: {self.provider}) template to ROS template..."
)
template = RosTemplate()
tf_resources, planned_resources = self._parse_resources()
self._transform_resources(tf_resources, template.resources)
tf_outputs = self._parse_outputs(planned_resources)
self._transform_outputs(tf_outputs, template.outputs)
typer.secho(
f"Transform terraform (provider: {self.provider}) template to ROS template successful.",
fg="green",
)
return template
def _parse_resources(self):
planned_resources = {}
planned_resource_list: list = self.plan[self.P_PLANNED_VALUES][
self.P_ROOT_MODULE
].get(self.P_RESOURCES, [])
for planned_resource in planned_resource_list:
if planned_resource[self.P_MODE] != self.P_MANAGED:
continue
org_address = "{type}.{name}".format(**planned_resource)
# Handle resource using `count`
if "index" in planned_resource:
count_resource_list = planned_resources.setdefault(org_address, [])
count_resource_list.append(planned_resource)
# Handle normal resource
else:
planned_resources[org_address] = [planned_resource]
resources = {}
managed_resources: dict = self.source[self.MANAGED_RESOURCES]
for org_address, managed_resource in managed_resources.items():
for planned_resource in planned_resources[org_address]:
resource = self._parse_resource(managed_resource, planned_resource)
address = planned_resource[self.P_ADDRESS]
resources[address] = resource
return resources, planned_resources
def _parse_resource(self, managed_resource: dict, planned_resource: dict):
depends_on = []
resource = {
self.DEPENDS_ON: depends_on,
self.TYPE: managed_resource[self.TYPE],
self.NAME: managed_resource[self.NAME],
}
m_depends_on = managed_resource.get(self.DEPENDS_ON)
if m_depends_on:
for items in m_depends_on:
depends_on.append(".".join(item[self.NAME] for item in items))
config = managed_resource[self.CONFIG]
properties = self._parse_source_config(config, planned_resource)
resource[self.PROPERTIES] = properties
return resource
def _parse_source_config(
self, config: dict, planned_entity: dict, entity_type: str = P_RESOURCES
):
# For resource
if entity_type == self.P_RESOURCES:
props = {}
planned_props = planned_entity[self.P_VALUES]
attributes = config.get(self.ATTRIBUTES)
if attributes:
for name, prop in attributes.items():
if name in ("depends_on", "count"):
continue
if planned_props is None:
planned_value = None
else:
planned_value = planned_props.get(name)
# If cannot get value from planned resource,
# it means value cannot resolved statically
if planned_value is None:
expr = prop[self.EXPR]
value = self._parse_source_expr(
expr, planned_entity.get("index")
)
if value is not None:
props[name] = value
else:
props[name] = planned_value
blocks = config.get(self.BLOCKS, [])
for block in blocks:
name = block[self.TYPE]
if name not in props:
props[name] = []
index = 0
else:
index = len(props[name])
if planned_props is None or not isinstance(planned_props[name], list):
new_planned_props = None
else:
new_planned_props = planned_props[name][index]
new_planned_entity = {self.P_VALUES: new_planned_props}
res = self._parse_source_config(
block[self.BODY], new_planned_entity, entity_type=entity_type
)
props[name].append(res)
return props
# For output
elif entity_type == self.P_OUTPUTS:
assert self.EXPR in config
assert self.P_PLANNED_RESOURCES in planned_entity
return self._parse_source_expr(
config[self.EXPR],
planned_entity.get("index"),
planned_entity.get(self.P_PLANNED_RESOURCES),
)
else:
raise ValueError(f"entity_type: {entity_type!r} not supported")
def _parse_source_expr(
self, expr: dict, count_index: int = None, planned_resources: dict = None
):
data = None
# Func
if self.ARGS in expr:
func_name = expr[self.NAME]
func_args = []
for arg in expr[self.ARGS]:
func_args.append(self._parse_source_expr(arg))
data = {
self.PROP_TYPE: self.PROP_TYPE_FUNC,
self.PROP_VALUE: func_name,
self.PROP_ARGS: func_args,
}
# Attr refer
elif self.TRAVERSAL in expr:
traversal = expr[self.TRAVERSAL]
source = expr.get(self.SOURCE)
# Attr refer a resource with count
if source and count_index is not None:
collection_traversal = source[self.COLLECTION][self.TRAVERSAL]
data = {
self.PROP_TYPE: self.PROP_TYPE_FUNC,
self.PROP_VALUE: self.FUNC_GET_ATT,
self.PROP_ARGS: [
f"{collection_traversal[0][self.NAME]}.{collection_traversal[1][self.NAME]}[{count_index}]",
traversal[0][self.NAME],
],
}
return data
first = traversal[0][self.NAME]
# ROS not supports, so return None
if first in ("var", "data"):
return None
traversal_length = len(traversal)
if traversal_length == 3:
data = {
self.PROP_TYPE: self.PROP_TYPE_FUNC,
self.PROP_VALUE: self.FUNC_GET_ATT,
self.PROP_ARGS: [
f"{first}.{traversal[1][self.NAME]}",
traversal[2][self.NAME],
],
}
elif traversal_length == 4 and self.KEY in traversal[2]:
# As there is no value for the Key (which is in traversal[2]), it needs to be read from a file.
src_range = traversal[2][self.SRC_RANGE]
start = src_range[self.START]
end = src_range[self.END]
content = ""
for lineno in range(start["Line"], end["Line"] + 1):
content += linecache.getline(
src_range[self.FILENAME], lineno
).rstrip("\n")
key = content[start["Column"] : end["Column"] - 1]
try:
count_index = int(key)
except ValueError:
return None
data = {
self.PROP_TYPE: self.PROP_TYPE_FUNC,
self.PROP_VALUE: self.FUNC_GET_ATT,
self.PROP_ARGS: [
f"{first}.{traversal[1][self.NAME]}[{count_index}]",
traversal[3][self.NAME],
],
}
# Attr refer using *
elif self.SOURCE in expr and self.EACH in expr and planned_resources:
attr_name = expr[self.EACH][self.TRAVERSAL][0][self.NAME]
source_traversal = expr[self.SOURCE][self.TRAVERSAL]
raw_address = (
f"{source_traversal[0][self.NAME]}.{source_traversal[1][self.NAME]}"
)
resource_list = planned_resources.get(raw_address)
if resource_list:
data = []
for resource in resource_list:
data.append(
{
self.PROP_TYPE: self.PROP_TYPE_FUNC,
self.PROP_VALUE: self.FUNC_GET_ATT,
self.PROP_ARGS: [
resource[self.P_ADDRESS],
attr_name,
],
}
)
return data
return None
# list literal (*)
# ROS not supports, so return None
elif self.FOR_EACH in expr:
return None
return data
def _transform_resources(self, tf_resources: dict, out_resources: Resources):
for resource_id, tf_resource in tf_resources.items():
tf_resource_type = tf_resource[self.TYPE]
tf_resource_props = tf_resource[self.PROPERTIES]
# Get rule by resource type
resource_rule: ResourceRule = self.rule_manager.resource_rules.get(
tf_resource_type
)
if resource_rule is None:
typer.secho(
f"Resource type {tf_resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
continue
props = self._transform_resource_props(
tf_resource_type,
tf_resource_props,
resource_rule.properties,
resource_rule.rule_id,
)
if props is None:
continue
depends_on = tf_resource[self.DEPENDS_ON]
resource_type = resource_rule.target_resource_type
resource = Resource(
resource_id=resource_id,
resource_type=resource_type,
depends_on=depends_on,
)
for k, v in props.items():
p = Property(k, v)
resource.properties.add(p)
out_resources.add(resource)
def _transform_resource_props(
self, resource_type, resource_props, resource_rule_props, rule_id
):
final_props = {}
for name, value in resource_props.items():
# Ignore not support prop
prop_rule = resource_rule_props.get(name)
if not prop_rule or prop_rule.get("Ignore"):
if not prop_rule or not prop_rule.get("NoWarning"):
typer.secho(
f"Resource property {name!r} of {resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
continue
# Warn if specify Warning
warn_msg = prop_rule.get("Warning")
if warn_msg:
if not warn_msg.endswith("."):
warn_msg += "."
typer.secho(warn_msg, fg="yellow")
transformed_value, resolved = self._transform_prop_or_attr(value)
final_name = prop_rule.get("To")
prop_type = prop_rule.get("Type")
prop_schema = prop_rule.get("Schema")
if final_name is None and prop_type != "Map":
raise InvalidRuleSchema(
path=rule_id,
reason=f"{name} is invalid. The Type should be 'Map' when To is None",
)
if prop_type == "List" and prop_schema:
final_value = []
for each in transformed_value:
val = self._transform_resource_props(
resource_type,
each,
prop_schema,
rule_id,
)
final_value.append(val)
elif prop_type == "Map" and prop_schema:
if isinstance(transformed_value, list):
transformed_value = transformed_value[0]
final_value = self._transform_resource_props(
resource_type, transformed_value, prop_schema, rule_id
)
else:
final_value = transformed_value
handler_name = prop_rule.get("Handler")
if handler_name is not None:
handler_func = getattr(basic_handler_module, handler_name)
final_value = handler_func(final_value, resolved)
if final_value is not None:
if final_name is None:
assert isinstance(final_value, dict)
final_props.update(final_value)
else:
# If To is duplicated, merge it
if final_name in final_props:
merged_value = final_props[final_name]
merge_handler_name = prop_rule.get("MergeHandler")
if merge_handler_name is not None:
merge_handler_func = getattr(
merge_handler_module, merge_handler_name
)
final_value = merge_handler_func(
final_value, merged_value, resolved
)
# If To is a multi-level attribute, it needs to be converted level by level.
# For example, if To is RuleList.0.Url, it should be converted to
# final_props["RuleList"][0]["Url"] = value.
self._handle_props_and_value(
final_props, final_name.split("."), final_value, rule_id
)
return final_props
def _handle_props_and_value(self, data, names: list, value, rule_id, _name_path=""):
if not names:
return
name = names[0]
if name.isdigit():
name = int(name)
_name_path = f"{_name_path}.{name}" if _name_path else f"{name}"
if len(names) == 1:
if name == "__single_to_multi_handler__":
if value:
data.update(value)
else:
try:
data[name] = value
except IndexError:
if name != len(data):
raise InvalidRuleSchema(
path=rule_id,
reason=f"{_name_path} is invalid. The index should be {len(data)}",
)
data.append(value)
else:
default_sub_data = [] if names[1].isdigit() else {}
try:
sub_data = data[name]
except KeyError:
sub_data = data[name] = default_sub_data
except IndexError:
if name != len(data):
raise InvalidRuleSchema(
path=rule_id,
reason=f"{_name_path} is invalid. The index should be {len(data)}",
)
data.append(default_sub_data)
sub_data = default_sub_data
self._handle_props_and_value(sub_data, names[1:], value, _name_path)
def _transform_prop_or_attr(self, value) -> (Any, bool):
if isinstance(value, list):
data = []
resolved = True
for v in value:
val, resolved_ = self._transform_prop_or_attr(v)
if resolved_ is False:
resolved = False
data.append(val)
return data, resolved
elif isinstance(value, dict):
if self.PROP_TYPE not in value:
data = {}
resolved = True
for k, v in value.items():
val, resolved_ = self._transform_prop_or_attr(v)
if resolved_ is False:
resolved = False
data[k] = val
return data, resolved
prop_type = value.get(self.PROP_TYPE)
if prop_type == self.PROP_TYPE_FUNC:
prop_args = value.get(self.PROP_ARGS)
prop_value = value.get(self.PROP_VALUE)
if prop_value == self.FUNC_GET_ATT:
return (
self._transform_func_get_att(prop_args[0], prop_args[1]),
False,
)
return None, True
return value, True
def _transform_func_get_att(self, resource_id, attrname):
tf_resource_type = None
for resource in self.plan[self.P_PLANNED_VALUES][self.P_ROOT_MODULE].get(
self.P_RESOURCES, []
):
if resource_id == resource[self.P_ADDRESS]:
tf_resource_type = resource[self.P_TYPE]
resource_rule: ResourceRule = self.rule_manager.resource_rules.get(
tf_resource_type
)
if tf_resource_type is None or resource_rule is None:
typer.secho(
f"Resource type {tf_resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
return None
if (
attrname not in resource_rule.attributes
or "To" not in resource_rule.attributes[attrname]
):
typer.secho(
f"Resource attribute {attrname!r} of {tf_resource_type!r} is not supported and will be ignored.",
fg="yellow",
)
return None
def parse(n):
names = n.split(".")
final_attrname = names[0]
result = {"Fn::GetAtt": [resource_id, final_attrname]}
if len(names) > 1:
# using JQ to get value
jq_expr = "."
for name in names[1:]:
try:
index = int(name)
jq_expr += f"[{index}]"
except ValueError:
jq_expr += f".{name}"
result = {"Fn::Jq": ["First", jq_expr, result]}
return result
attr_rule = resource_rule.attributes[attrname]
to_name = attr_rule["To"]
if isinstance(to_name, str):
final_value = parse(to_name)
elif isinstance(to_name, (list, tuple)):
final_value = [parse(each) for each in to_name]
else:
raise InvalidRuleSchema(
path=resource_rule.rule_id,
reason=f"The type of To={to_name} is invalid. Expect str or list",
)
handler_name = attr_rule.get("Handler")
if handler_name is not None:
handler_func = getattr(basic_handler_module, handler_name)
final_value = handler_func(final_value, False)
return final_value
def _parse_outputs(self, planned_resources: dict = None):
planned_outputs = {}
raw_planned_outputs: dict = self.plan[self.P_PLANNED_VALUES].get(
self.P_OUTPUTS, {}
)
for name, info in raw_planned_outputs.items():
value = info.get(self.P_VALUE)
planned_outputs[name] = value
outputs = {}
source_outputs: dict = self.source[self.OUTPUTS]
for name, source_output in source_outputs.items():
planned_output = planned_outputs[name]
output = self._parse_output(
source_output, planned_output, planned_resources
)
outputs[name] = output
return outputs
def _parse_output(
self,
source_output: dict,
planned_output: dict = None,
planned_resources: dict = None,
):
if planned_output is not None:
return planned_output
output = self._parse_source_config(
source_output,
{
self.P_VALUES: planned_output,
self.P_PLANNED_RESOURCES: planned_resources,
},
entity_type=self.P_OUTPUTS,
)
return output
def _transform_outputs(self, tf_outputs: dict, out_outputs: Outputs):
for name, value in tf_outputs.items():
value, _ = self._transform_prop_or_attr(value)
output = Output(
name=name,
value=value,
)
out_outputs.add(output)