rostran/providers/ros/template.py (410 lines of code) (raw):

import os import textwrap import uuid from pathlib import Path from typing import Any, Optional, List import typer import json from Tea.exceptions import TeaException from alibabacloud_credentials.exceptions import CredentialException from alibabacloud_tea_util.models import RuntimeOptions from libterraform import TerraformCommand from libterraform.exceptions import TerraformCommandError from ruamel import yaml from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_ros20190910.client import Client from alibabacloud_credentials.client import Client as CredClient from alibabacloud_ros20190910 import models from rostran.core.exceptions import ( TemplateFormatNotSupport, InvalidTemplateWorkspace, InvalidTargetPath, InvalidTemplate ) from rostran.core.rule_manager import RuleManager, RuleClassifier, ResourceRule from rostran.core.template import Template from rostran.core.format import FileFormat from rostran.providers.ros import functions from rostran.providers.terraform import template_blocks as tf from tools.utils import camel_to_snake class WrapTerraformTemplate(Template): @classmethod def initialize( cls, path: str, format: FileFormat = FileFormat.Terraform, ): if format == FileFormat.Json: with open(path) as f: source = json.load(f) elif format == FileFormat.Yaml: with open(path) as f: source = yaml.load(f) else: raise TemplateFormatNotSupport(path=path, format=format) return cls(source=source) def transform(self, target_path=None): typer.secho(f"Transforming ROS template to terraform template...") workspace = self.source.get("Workspace") if not isinstance(workspace, dict): raise InvalidTemplateWorkspace( reason=f"The type of data ({workspace}) should be dict" ) for file_name, file_content in workspace.items(): file_path = target_path + "/" + file_name dir_path = "/".join(file_path.split("/")[:-1]) if not os.path.exists(dir_path): os.makedirs(dir_path) with open(file_path, "w") as f: f.write(file_content) typer.secho( f"Transform ROS template to terraform template successful.", fg="green", ) class ROS2TerraformTemplate(Template): ( ROS_TEMPLATE_FORMAT_VERSION, TRANSFORM, DESCRIPTION, CONDITIONS, MAPPINGS, PARAMETERS, RESOURCES, OUTPUTS, RULES, METADATA, WORKSPACE, LOCALS ) = ( "ROSTemplateFormatVersion", "Transform", "Description", "Conditions", "Mappings", "Parameters", "Resources", "Outputs", "Rules", "Metadata", "Workspace", "Locals" ) TF_INVALID_VARIABLE_KEYS = ( "source", "version", "providers", "count", "for_each", "lifecycle", "depends_on", "locals" ) def __init__(self, source: dict, rule_manager: RuleManager): super().__init__(source) self.rule_manager = rule_manager self.parameters = source.get(self.PARAMETERS) or {} self.resources = source.get(self.RESOURCES) or {} self.outputs = source.get(self.OUTPUTS) or {} self.rules = source.get(self.RULES) or {} self.metadata = source.get(self.METADATA) or {} self.conditions = source.get(self.CONDITIONS) or {} self.resources_with_count = [] self.tf_parameters = {} self._check_resources() def _check_resources(self): for name, value in self.resources.items(): if value.get("Condition") or value.get("Count"): self.resources_with_count.append(name) def get_tf_params(self, param): if param in self.tf_parameters: return self.tf_parameters[param] if param in self.TF_INVALID_VARIABLE_KEYS: tf_param = f"{param}_{str(uuid.uuid4())[:8]}" self.tf_parameters[param] = tf_param return tf_param return param @classmethod def initialize(cls, source: dict, validate: bool = True, _: FileFormat = None): if validate: cls.validate_ros_template(source) rule_manager = RuleManager.initialize(RuleClassifier.ROS) return cls(source=source, rule_manager=rule_manager) @staticmethod def validate_ros_template(source): try: ros_config = open_api_models.Config(credential=CredClient()) except CredentialException as e: typer.secho( f"The Credential is not set, please set credential by:\n" f"1. Environment variables (ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET)\n" f"2. The ini configuration file defined by the environment variable ALIBABA_CLOUD_CREDENTIALS_FILE\n" f"3. Alibaba Cloud SDK Credentials default configuration file where" f" located in ~/.alibabacloud/credentials.ini", fg="red", ) raise InvalidTemplate(reason=f'{e}') ros_client = Client(ros_config) request = models.ValidateTemplateRequest(template_body=json.dumps(source)) try: runtime = RuntimeOptions(autoretry=True, max_attempts=3, read_timeout=60000, connect_timeout=60000) ros_client.validate_template_with_options(request, runtime=runtime) except TeaException as e: typer.secho( f"ROS template is invalid, please check the template and try again, {e}", fg="red", ) raise InvalidTemplate(reason=f'{e}') def transform(self, target_path: str = None, single_file: bool = False): """ transform ros to Terraform """ if not target_path: target_path = os.getcwd() + "/terraform/alicloud" output_dir = Path(target_path) if not output_dir.exists(): output_dir.mkdir(parents=True, exist_ok=True) if not output_dir.is_dir(): typer.secho( f"{target_path} is not a directory", fg="red", ) raise InvalidTargetPath(target_path=target_path, reason=f"{target_path} is not a directory") typer.secho(f"Transforming ROS template to terraform ...") conditions = self._transform_conditions() parameters = self._transform_parameters() resources = self._transform_resources() outputs = self._transform_outputs() if single_file: file_path = (output_dir / "main.tf").resolve() with file_path.open("w", encoding="utf-8") as f: for block in conditions + parameters + resources + outputs: contents = block.render() f.write(contents) f.write("\n\n") tf_files = [file_path] else: tf_blocks = (('local', conditions), ('variable', parameters), ('main', resources), ('output', outputs)) tf_files = [] for block_name, blocks in tf_blocks: if not blocks: continue file_name = f"{block_name}.tf" file_path = (output_dir / file_name).resolve() with file_path.open("w", encoding="utf-8") as f: for block in blocks: contents = block.render() f.write(contents) f.write("\n\n") tf_files.append(file_path) typer.secho(f"Transform successful!\n") try: tf_command = TerraformCommand(os.path.abspath(output_dir)) tf_command.fmt(check=True, no_color=False, write=True) if not single_file: for file_path in tf_files: with file_path.open("r", encoding="utf-8") as f: typer.secho(f.read(), fg="green") except TerraformCommandError: if not single_file: typer.secho(f"Terraform fmt failed, the original content will be displayed.\n", fg="yellow") for item in conditions + parameters + resources + outputs: typer.secho(item, fg="green") def get_resource_rule(self, ros_res_type: str) -> Optional[ResourceRule]: resource_rule: ResourceRule = self.rule_manager.resource_rules.get(ros_res_type) if resource_rule is None: typer.secho( f"Resource type {ros_res_type!r} is not supported and will be ignored.", fg="yellow", ) return return resource_rule def resolve_values(self, data: Any, tf_type: bool = True) -> Any: if isinstance(data, dict): result = {} for key, value in data.items(): if key == 'Ref': return functions.ref(self, value) elif key in functions.ALL_FUNCTIONS: return functions.ALL_FUNCTIONS[key](self, self.resolve_values(value, False)) else: result[key] = self.resolve_values(value, tf_type) elif isinstance(data, list): result = [self.resolve_values(item, tf_type) for item in data] else: result = data if tf_type: return tf.convert_to_tf_type(result) return result def _transform_parameters(self) -> List[tf.Variable]: tf_vars = [] for name, value in self.parameters.items(): tf_name = self.get_tf_params(camel_to_snake(name)) value = value.copy() param_type = value.pop("Type", None) tf_value = dict() if param_type == "String": tf_value["type"] = "string" elif param_type == "Number": tf_value["type"] = "number" elif param_type == "Boolean": tf_value["type"] = "bool" elif param_type == "CommaDelimitedList": tf_value["type"] = "list(any)" else: msg = f"The params type {param_type} is not supported, may be ignored when referenced by a resource." tf_value["comment-type"] = tf.CommentType(msg) tf_value["type"] = "any" param_default = value.pop("Default", None) if param_default: if param_type in ("String", "Number", "Boolean"): tf_value["default"] = tf.convert_to_tf_type(param_default, param_type.lower()) else: tf_value["default"] = self.resolve_values(param_default) param_sensitive = value.pop("NoEcho", None) if param_sensitive: tf_value["sensitive"] = tf.BooleanType(True) required = value.pop("Required", None) if required: tf_value["nullable"] = tf.BooleanType(False) if value: value = json.dumps(value, indent=2, ensure_ascii=False) value = textwrap.indent(value, " ") if "${" in value: value = value.replace("${", "$${") tf_value["description"] = f"<<EOT\n{value}\n EOT" var_block = tf.Variable(tf_name, tf_value) tf_vars.append(var_block) return tf_vars def _transform_conditions(self) -> List[tf.Locals]: tf_conditions = [] for name, value in self.conditions.items(): tf_value = {name: self.resolve_values(value)} tf_conditions.append(tf.Locals(tf_value)) return tf_conditions def _get_tf_argument( self, res_type: str, values: Any, schema: dict, prop_name: str = None ): if isinstance(values, dict): tf_argument = {} if not schema or not isinstance(schema, dict): return self.resolve_values(values) for name, value in values.items(): if prop_name: prop_name = f"{prop_name}.{name}" prop_flag = prop_name or name msg = f"Resource property {prop_flag!r} of {res_type!r} is not supported and will be ignored." if name not in schema: typer.secho(msg, fg="yellow") continue schema_value = schema[name] or {} tf_arg_name = schema_value.get("To") if schema_value.get("Ignore") or not tf_arg_name: typer.secho(msg, fg="yellow") continue handler = schema_value.get("Handler") if handler: handler_func = getattr(functions, handler, None) if callable(handler_func): if isinstance(tf_arg_name, list): for n in tf_arg_name: tf_argument[n] = handler_func(self, value) else: tf_argument[tf_arg_name] = handler_func(self, value) continue sub_schema = schema_value.get("Schema") sub_args = self._get_tf_argument(res_type, value, sub_schema, name) if sub_schema: if isinstance(sub_args, list): for i, item in enumerate(sub_args): block = tf.Block(tf_arg_name, arguments=item) tf_argument[f"{prop_flag}${i}"] = block elif isinstance(sub_args, dict): block = tf.Block(tf_arg_name, arguments=sub_args) tf_argument[f"{prop_flag}$Block"] = block else: msg = f"The value {sub_args} of arguments {tf_arg_name} is not block and will be ignore." tf_argument[f"{prop_flag}$Comment"] = tf.CommentType(msg) else: if isinstance(tf_arg_name, list): for n in tf_arg_name: tf_argument[n] = sub_args else: tf_argument[tf_arg_name] = sub_args return tf_argument elif isinstance(values, list): if len(values) == 1 and not schema: return self._get_tf_argument(res_type, values[0], schema, prop_name) return [self._get_tf_argument(res_type, item, schema, prop_name) for item in values] else: return self.resolve_values(values) def _transform_resources(self) -> List[tf.Resource]: tf_resources = [] for name, res in self.resources.items(): tf_name = camel_to_snake(name) ros_res_type = res.get("Type") resource_rule = self.get_resource_rule(ros_res_type) if not resource_rule: typer.secho( f"Resource type {ros_res_type!r} is not supported and will be ignored.", fg="yellow", ) continue tf_res_type = resource_rule.target_resource_type properties = res.get("Properties") resolved_props = self.resolve_values(properties, False) or {} tf_argument = self._get_tf_argument(ros_res_type, resolved_props, resource_rule.properties) if isinstance(tf_argument, tf.JsonType): tf_argument = tf_argument.value condition = res.get("Condition") count = res.get("Count") if count: tf_argument["count"] = self.resolve_values(count) if condition: tf_count = tf_argument.get("count") if tf_argument.get("count") is not None else 1 tf_argument["count"] = tf.LiteralType(f"local.{condition} ? {tf_count} : 0") depends_on = res.get("DependsOn") if depends_on: if isinstance(depends_on, str): depends_on = [depends_on] tf_depends_on = [] for depend in depends_on: depend_res_type = self.resources[depend]['Type'] depend_resource_rule = self.get_resource_rule(depend_res_type) if not depend_resource_rule: typer.secho( f"Resource type {depend_res_type!r} is not supported and will be ignored.", fg="yellow", ) continue tf_depend = f"{depend_resource_rule.target_resource_type}.{camel_to_snake(depend)}" tf_depends_on.append(tf.LiteralType(tf_depend)) tf_argument["depends_on"] = tf.JsonType(tf_depends_on) resource = tf.Resource(tf_name, tf_res_type, tf_argument) tf_resources.append(resource) return tf_resources def _transform_outputs(self) -> List[tf.Output]: tf_outputs = [] for name, value in self.outputs.items(): tf_name = camel_to_snake(name) tf_value = dict() resolved_value = self.resolve_values(value.get("Value")) condition = value.get("Condition") if condition: cont_value = f"local.{condition} ? {resolved_value} : {tf.NullType()}" tf_value["value"] = tf.LiteralType(cont_value) else: if isinstance(resolved_value, tf.CommentType): tf_value["value_comment"] = resolved_value tf_value["value"] = tf.NullType() else: tf_value["value"] = resolved_value desc = value.get("Description") if desc: if isinstance(desc, str) and "\n" not in desc: tf_value["description"] = tf.QuotedString(desc) else: desc = json.dumps(desc, indent=2, ensure_ascii=False) desc = textwrap.indent(desc, " ") tf_value["description"] = f"<<EOT\n{desc}\n EOT" output_block = tf.Output(tf_name, tf_value) tf_outputs.append(output_block) return tf_outputs