tools/generator.py (377 lines of code) (raw):

import os import shutil from typing import Union import yaml import boto3 from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_ros20190910.client import Client from alibabacloud_credentials.client import Client as CredClient from tools import exceptions from tools.settings import ( TF_ALI_ROS_PROP_MAPPINGS, TF_ALI_RULES_DIR, CF_ROS_PROP_MAPPINGS, CF_RESOURCE_RULES_DIR, ROS_RESOURCE_RULES_DIR, TF_ALI_DEPRECATED_PROPERTIES ) from tools.resource import RosResource, TerraformResource, CloudFormationResource from tools.utils import snake_to_camel, camel_to_snake class BaseRuleGenerator: TYPE_MAPPING = { "list": "List", "map": "Map", } COMPATIBLE_TYPE_MAPPING = { "integer": {"number", "string"}, "number": {"string"}, "string": { "integer", "number", }, # ROS supports automatic conversion of string to integer. } def __init__( self, from_resource: Union[TerraformResource, CloudFormationResource, RosResource], to_resource: Union[TerraformResource, RosResource], ros2tf: bool = False ): self.from_resource = from_resource self.to_resource = to_resource self.ros2tf = ros2tf def generate(self): path = self._get_rule_path() rule = None if os.path.exists(path): with open(path, "r") as f: rule = yaml.safe_load(f) generated_rule = self._generate_rule() # reserve some rule sections if rule: for section in ("Properties", "Attributes"): pas = rule.get(section, {}) # props or attrs generated_pas = generated_rule.setdefault(section, {}) for pa_name, pa in pas.items(): try: if not pa.get("Ignore") and generated_pas.get(pa_name, {}).get( "Ignore" ): generated_pas[pa_name] = pa except AttributeError as e: print("-------") print(generated_pas) print(pa_name, generated_pas.get(pa_name, {})) print(e) raise e attr_id = rule.get("Attributes", {}).get("id", {}).get("To") if attr_id: generated_rule.setdefault("Attributes", {}).setdefault("id", {})[ "To" ] = attr_id with open(path, "w") as f: content = yaml.safe_dump(generated_rule, sort_keys=False) content = content.replace("'# todo'", "# todo") f.write(content) def _get_rule_path(self): raise NotImplementedError() def _generate_rule(self): rule = { "Version": "2020-06-01", "Type": "Resource", "ResourceType": { "From": self.from_resource.resource_type, "To": self.to_resource.resource_type, }, } props_rule = self._generate_properties_rule( self.from_resource.properties(), self.to_resource.properties() ) if props_rule: rule["Properties"] = props_rule attrs_rule = self._generate_attributes_rule( self.from_resource.attributes(), self.to_resource.attributes(), ) if attrs_rule: rule["Attributes"] = attrs_rule return rule def _generate_properties_rule( self, from_schema: dict, ros_schema: dict, from_prop_path="", ros_prop_path="" ): props_rule = {} for from_key in sorted(from_schema): from_prop_path_ = ( f"{from_prop_path}.{from_key}" if from_prop_path else from_key ) ros_key = self._get_ros_key(ros_schema, from_key, from_prop_path_) ros_prop_path_ = f"{ros_prop_path}.{ros_key}" if ros_prop_path else ros_key if ros_key not in ros_schema: prop_rule = {"Ignore": True} else: from_prop_schema = from_schema[from_key] if "Handler" in from_prop_schema: prop_rule = {"To": ros_key, "Handler": from_prop_schema["Handler"]} else: from_schema_type = from_prop_schema.get("Type") ros_schema_type = ros_schema[ros_key]["Type"] if not from_schema_type and "oneof" in from_prop_schema: print( f"[Warning] The Schema of property {from_prop_path_!r} has multiple types, " f"and should set rule manually. " ) prop_rule = {"Ignore": True} elif not self._is_type_compatible( from_schema_type, ros_schema_type ): print( f"[Warning] Schema type not equal. " f"{from_schema_type}({from_prop_path_}) != {ros_schema_type}({ros_prop_path_})" ) prop_rule = {"Ignore": True} else: prop_rule = {"To": ros_key} rule_type = self.TYPE_MAPPING.get(from_schema_type) if rule_type and "Schema" in from_prop_schema: prop_rule["Type"] = rule_type if rule_type == "List": from_sub_schema = from_prop_schema["Schema"]["*"] from_sub_prop_path = f"{from_prop_path_}.*" try: ros_sub_schema = ros_schema[ros_key]["Schema"]["*"] except KeyError: ros_sub_schema = { "Required": True, "Type": "string", } ros_sub_prop_path = f"{ros_prop_path_}.*" else: from_sub_schema = from_prop_schema["Schema"] from_sub_prop_path = from_prop_path_ try: ros_sub_schema = ros_schema[ros_key]["Schema"] except KeyError: ros_sub_schema = { "Required": True, "Type": "string", } ros_sub_prop_path = ros_prop_path_ from_sub_schema_type = from_sub_schema.get("Type") ros_sub_schema_type = ros_sub_schema.get("Type") if not from_sub_schema_type or not ros_sub_schema_type: continue if not self._is_type_compatible( from_sub_schema_type, ros_sub_schema_type ): print( f"[Warning] Schema type not equal. " f"{from_sub_prop_path}({from_sub_schema_type}) != {ros_sub_prop_path}({ros_sub_schema_type})" ) prop_rule = {"Ignore": True} else: from_sub_schema_s = from_sub_schema.get("Schema") ros_sub_schema_s = ros_sub_schema.get("Schema") if from_sub_schema_s and ros_sub_schema_s: sub_properties_rule = ( self._generate_properties_rule( from_sub_schema_s, ros_sub_schema_s, from_sub_prop_path, ros_sub_prop_path, ) ) prop_rule["Schema"] = sub_properties_rule props_rule[from_key] = prop_rule return props_rule def _get_ros_key(self, ros_schema: dict, from_key: str, from_prop_path: str) -> str: raise NotImplementedError() def _generate_attributes_rule(self, from_schema: dict, ros_schema: dict): attrs_rule = self._get_init_attrs_rule() for from_key in sorted(from_schema): ros_key = self._get_ros_key(ros_schema, from_key, from_key) if ros_key not in ros_schema: attr_rule = {"Ignore": True} else: attr_rule = {"To": ros_key} attrs_rule[from_key] = attr_rule return attrs_rule def _get_init_attrs_rule(self): return {} @classmethod def _is_type_compatible(cls, type1, type2): if type1 == type2: return True comp_types = cls.COMPATIBLE_TYPE_MAPPING.get(type1) if comp_types and type2 in comp_types: return True return False class TerraformRuleGenerator(BaseRuleGenerator): @classmethod def initialize( cls, from_resource_type: str, to_resource_type: str, from_resource_filename: str = None, ): if not from_resource_type.startswith("alicloud_"): raise exceptions.RosToolWarning( message="Terraform resource type only support alicloud resource which starts with alicloud_" ) path = shutil.which("asty") if not path: raise exceptions.RosToolWarning( message="Command `asty` not found. Please run `go install github.com/asty-org/asty`" ) tf_resource = TerraformResource(from_resource_type, from_resource_filename) ros_config = open_api_models.Config( credential=CredClient(), endpoint="ros.aliyuncs.com", ) ros_client = Client(ros_config) ros_resource = RosResource(ros_client, to_resource_type) return cls(tf_resource, ros_resource) def _get_rule_path(self): name = self.from_resource.resource_type.replace("alicloud_", "") filename = f"{name}.yml" path = os.path.join(TF_ALI_RULES_DIR, filename) return path def _get_ros_key(self, ros_schema: dict, from_key: str, from_prop_path: str): ros_key = snake_to_camel(from_key) if ros_key not in ros_schema: ros_key = TF_ALI_ROS_PROP_MAPPINGS.get( self.from_resource.resource_type, {} ).get(from_prop_path) if not ros_key: ros_key = TF_ALI_ROS_PROP_MAPPINGS.get("*", {}).get(from_prop_path) return ros_key def _get_init_attrs_rule(self): return {"id": {"To": "# todo"}} class CloudFormationRuleGenerator(BaseRuleGenerator): @classmethod def initialize( cls, from_resource_type: str, ros_resource_type: str, ): if not from_resource_type.startswith("AWS::"): raise exceptions.RosToolWarning( message="CloudFormation resource type only support CloudFormation resource which starts with AWS::" ) cf_client = boto3.client("cloudformation") from_resource = CloudFormationResource(cf_client, from_resource_type) ros_config = open_api_models.Config( credential=CredClient(), endpoint="ros.aliyuncs.com", ) ros_client = Client(ros_config) ros_resource = RosResource(ros_client, ros_resource_type) return cls(from_resource, ros_resource) def _get_rule_path(self): name = camel_to_snake(self.from_resource.resource_type.replace("::", "_")) filename = f"{name}.yml" path = os.path.join(CF_RESOURCE_RULES_DIR, filename) return path def _get_ros_key(self, ros_schema: dict, from_key: str, from_prop_path: str): ros_key = from_key if ros_key not in ros_schema: ros_key = CF_ROS_PROP_MAPPINGS.get( self.from_resource.resource_type, {} ).get(from_prop_path) if not ros_key: ros_key = CF_ROS_PROP_MAPPINGS.get("*", {}).get(from_prop_path) return ros_key class ROS2TerraformRuleGenerator(BaseRuleGenerator): @classmethod def initialize( cls, from_resource_type: str, to_resource_type: str ): tf_resource = TerraformResource(to_resource_type) ros_config = open_api_models.Config( credential=CredClient(), endpoint="ros.aliyuncs.com", ) ros_client = Client(ros_config) ros_resource = RosResource(ros_client, from_resource_type) return cls(ros_resource, tf_resource) def _get_rule_section(self, tf_rule_section, ros_res_section): rule_section = {} for key, value in tf_rule_section.items(): if value.get("Ignore"): continue schema_key = value.get("To") tf_deprecated_props = TF_ALI_DEPRECATED_PROPERTIES.get(self.to_resource.resource_type) if tf_deprecated_props and key in tf_deprecated_props: continue if not isinstance(schema_key, str): continue if not schema_key or schema_key not in ros_res_section: continue schema_value = {"To": key} if schema_key not in rule_section: rule_section[schema_key] = schema_value else: rule_section[f"{schema_key}$$0"] = schema_value handler = value.get("Handler") if handler: if handler == "tags_dict_to_list": handler = "handle_tags" schema_value["Handler"] = handler schema = value.get("Schema") if not schema: continue schema_type = value.get("Type") if not schema_type: continue schema_value["Type"] = schema_type if schema_type == "Map": try: ros_res_schema = ros_res_section[schema_key]["Schema"] except KeyError: continue elif schema_type == "List": try: ros_res_schema = ros_res_section[schema_key]["Schema"]["*"]["Schema"] except KeyError: continue else: continue new_schema = self._get_rule_section(schema, ros_res_schema) schema_value["Schema"] = new_schema extra_keys = set(ros_res_section.keys()) - set(rule_section.keys()) for ros_extra_key in extra_keys: rule_section[ros_extra_key] = {"Ignore": True} return rule_section def _generate_from_tf_rule(self, tf_rule): for section in ["Properties", "Attributes"]: ros_res_section = getattr(self.to_resource, section.lower(), {}) tf_rule_section = tf_rule.get(section, {}) or {} rule_section = self._get_rule_section(tf_rule_section, ros_res_section) tf_rule[section] = rule_section return tf_rule def generate(self): rule = { "Version": "2020-06-01", "Type": "Resource", "ResourceType": { "From": self.from_resource.resource_type, "To": self.to_resource.resource_type, }, } name = self.to_resource.resource_type.replace("alicloud_", "") tf_rule_path = os.path.join(TF_ALI_RULES_DIR, f"{name}.yml") path = self._get_rule_path() if os.path.exists(path): return if os.path.exists(tf_rule_path): with open(tf_rule_path, "r") as f: tf_rule = yaml.safe_load(f) for section in ["Properties", "Attributes"]: ros_res_section = getattr(self.from_resource, section.lower())() tf_rule_section = tf_rule.get(section, {}) or {} rule[section] = self._get_rule_section(tf_rule_section, ros_res_section) with open(path, "w") as f: content = yaml.safe_dump(rule, sort_keys=False) f.write(content) def _get_rule_path(self): name = camel_to_snake(self.from_resource.resource_type.replace("::", "_")) path = os.path.join(ROS_RESOURCE_RULES_DIR, f"{name}.yml") return path