rostran/core/rule_manager.py (290 lines of code) (raw):

import os from typing import Dict import typer from ruamel.yaml import YAML from .exceptions import ( InvalidRuleSchema, RuleVersionNotSupport, RuleTypeNotSupport, RuleAlreadyExist, ) from rostran.core.settings import RULES_DIR import rostran.handlers.resource as resource_handler_module yaml = YAML() class RuleClassifier: TerraformAliCloud = "terraform/alicloud" TerraformAWS = "terraform/aws" CloudFormation = "cloudformation" ROS = "ros" class RuleManager: def __init__( self, rule_classifier: str, resource_rules: Dict[str, "ResourceRule"] = None, pseudo_parameters_rule: "PseudoParametersRule" = None, function_rule: "FunctionRule" = None, meta_data_rule: "MetaDataRule" = None, association_property_rule: "AssociationPropertyRule" = None, ): self.rule_classifier = rule_classifier self.resource_rules = resource_rules or {} self.function_rule = function_rule self.pseudo_parameters_rule = pseudo_parameters_rule self.meta_data_rule = meta_data_rule self.association_property_rule = association_property_rule @classmethod def initialize(cls, rule_classifier): rule_manager = cls(rule_classifier) rule_manager.load() return rule_manager def load(self): rules_dir = os.path.join(RULES_DIR, self.rule_classifier) if not os.path.exists(rules_dir): return for root, dirs, files in os.walk(rules_dir): for filename in files: if not filename.endswith(".yml"): continue filepath = os.path.join(root, filename) if not os.path.isfile(filepath): continue rule = Rule.initialize(filepath) if rule.type == Rule.RESOURCE: if rule.rule_id in self.resource_rules: raise RuleAlreadyExist(id=rule.rule_id, path=filepath) self.resource_rules[rule.rule_id] = rule elif rule.type == Rule.PSEUDO_PARAMETERS: if self.pseudo_parameters_rule: raise RuleAlreadyExist(id=rule.rule_id, path=filepath) self.pseudo_parameters_rule = rule elif rule.type == Rule.FUNCTION: if self.function_rule: raise RuleAlreadyExist(id=rule.rule_id, path=filepath) self.function_rule = rule elif rule.type == Rule.META_DATA: if self.meta_data_rule: raise RuleAlreadyExist(id=rule.rule_id, path=filepath) self.meta_data_rule = rule elif rule.type == Rule.ASSOCIATION_PROPERTY: if self.association_property_rule: raise RuleAlreadyExist(id=rule.rule_id, path=filepath) self.association_property_rule = rule def show(self, markdown=False, with_link=False): typer.secho(f"# Rules for {self.rule_classifier}", fg=typer.colors.GREEN) newline = False def sorted_echo(d, getter, from_link_func=None, to_link_func=None): if markdown: typer.echo("| From | To |") typer.echo("| ---- | ---- |") for from_ in sorted(d): to = getter(d[from_]) if not to: continue if markdown: if with_link: if from_link_func: from_link = from_link_func(from_) if from_link: from_ = f"[{from_}]({from_link})" if to_link_func: to_link = to_link_func(to) if to_link: to = f"[{to}]({to_link})" typer.echo(f"| {from_} | {to} |") else: typer.echo(f"{from_} -> {to}") def resource_from_link_func(rt: str): if self.rule_classifier == RuleClassifier.TerraformAliCloud: rt = rt.replace("alicloud_", "") return f"https://registry.terraform.io/providers/aliyun/alicloud/latest/docs/resources/{rt}" elif self.rule_classifier == RuleClassifier.CloudFormation: rt = rt.replace("AWS::", "").replace("::", "-").lower() return f"https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-{rt}" else: return None def resource_to_link_func(rt: str): rt = rt.replace("::", "-").lower() return f"https://www.alibabacloud.com/help/ros/developer-reference/{rt}" if self.resource_rules: typer.secho("## Resources", fg=typer.colors.GREEN) newline = True sorted_echo( self.resource_rules, getter=lambda x: x.target_resource_type, from_link_func=resource_from_link_func, to_link_func=resource_to_link_func, ) if self.pseudo_parameters_rule: if newline: typer.echo("") typer.secho("## Pseudo Parameters", fg=typer.colors.GREEN) newline = True sorted_echo( self.pseudo_parameters_rule.pseudo_parameters, getter=lambda x: x.get("To"), ) if self.function_rule: if newline: typer.echo("") typer.secho("## Function", fg=typer.colors.GREEN) newline = True sorted_echo(self.function_rule.function, getter=lambda d: d.get("To")) if self.association_property_rule: if newline: typer.echo("") typer.secho("## Association Property", fg=typer.colors.GREEN) newline = True sorted_echo( self.association_property_rule.association_property, getter=lambda x: x.get("To"), ) if self.meta_data_rule: if newline: typer.echo("") typer.secho("## Metadata", fg=typer.colors.GREEN) sorted_echo(self.meta_data_rule.meta_data, getter=lambda x: x.get("To")) class Rule: TYPES = (RESOURCE, PSEUDO_PARAMETERS, FUNCTION, META_DATA, ASSOCIATION_PROPERTY) = ( "Resource", "PseudoParameters", "Function", "Metadata", "AssociationProperty", ) _PROPERTIES = ( VERSION, TYPE, RESOURCE_TYPE, PROPERTIES, ATTRIBUTES, HANDLER, ) = ( "Version", "Type", "ResourceType", "Properties", "Attributes", "Handler", ) SUPPORTED_VERSIONS = ("2020-06-01",) def __init__(self, version, type, rule_id=None, *args, **kwargs): self.version = version self.type = type self.rule_id = rule_id @classmethod def initialize(cls, path: str): with open(path) as f: data = yaml.load(f) if not isinstance(data, dict): raise InvalidRuleSchema(path=path, reason="rule data type should be dict") version = str(data.get(cls.VERSION)) if version not in cls.SUPPORTED_VERSIONS: raise RuleVersionNotSupport(path=path, version=version) type_ = data.get(cls.TYPE) if type_ not in cls.TYPES: raise RuleTypeNotSupport(path=path, type=type_) if type_ == cls.RESOURCE: return ResourceRule.initialize_from_info(path, data, version, type_) elif type_ == cls.PSEUDO_PARAMETERS: return PseudoParametersRule.initialize_from_info(path, data, version, type_) elif type_ == cls.FUNCTION: return FunctionRule.initialize_from_info(path, data, version, type_) elif type_ == cls.META_DATA: return MetaDataRule.initialize_from_info(path, data, version, type_) elif type_ == cls.ASSOCIATION_PROPERTY: return AssociationPropertyRule.initialize_from_info( path, data, version, type_ ) class ResourceRule(Rule): def __init__( self, version, type, rule_id, properties, attributes, target_resource_type=None, handler=None, ): super().__init__(version, type, rule_id) self.properties = properties self.attributes = attributes self.target_resource_type = target_resource_type self.handler = handler @classmethod def initialize_from_info(cls, path, data, version, type): resource_type = data.get(cls.RESOURCE_TYPE, {}) if not isinstance(resource_type, dict): InvalidRuleSchema(path=path, reason=f"{cls.RESOURCE} type should be dict") rule_id = resource_type["From"] target_resource_type = resource_type.get("To") properties = data.get(cls.PROPERTIES, {}) if not isinstance(properties, dict): InvalidRuleSchema(path=path, reason=f"{cls.PROPERTIES} type should be dict") attributes = data.get(cls.ATTRIBUTES, {}) if not isinstance(attributes, dict): InvalidRuleSchema(path=path, reason=f"{cls.ATTRIBUTES} type should be dict") handler_name = data.get(cls.HANDLER) handler_func = ( getattr(resource_handler_module, handler_name) if handler_name else None ) return cls( version, type, rule_id, properties, attributes, target_resource_type, handler_func, ) class PseudoParametersRule(Rule): def __init__(self, version, type, rule_id, pseudo_parameters): super().__init__(version, type, rule_id) self.pseudo_parameters = pseudo_parameters @classmethod def initialize_from_info(cls, path, data, version, type): pseudo_parameters = data.get(cls.PSEUDO_PARAMETERS, {}) if not isinstance(pseudo_parameters, dict): InvalidRuleSchema( path=path, reason=f"{cls.PSEUDO_PARAMETERS} type should be dict" ) rule_id = cls.PSEUDO_PARAMETERS return cls(version, type, rule_id, pseudo_parameters) class FunctionRule(Rule): def __init__(self, version, type, rule_id, function): super().__init__(version, type, rule_id) self.function = function self.ignored = self.ignored_function() @classmethod def initialize_from_info(cls, path, data, version, type): function = data.get(cls.FUNCTION, {}) if not isinstance(function, dict): InvalidRuleSchema(path=path, reason=f"{cls.FUNCTION} type should be dict") rule_id = cls.FUNCTION return cls(version, type, rule_id, function) def ignored_function(self): return [aws for aws, ros in self.function.items() if ros.get("Ignore")] class MetaDataRule(Rule): def __init__(self, version, type, rule_id, meta_data): super().__init__(version, type, rule_id) self.meta_data = meta_data @classmethod def initialize_from_info(cls, path, data, version, type): meta_data = data.get(cls.META_DATA, {}) if not isinstance(meta_data, dict): InvalidRuleSchema(path=path, reason=f"{cls.META_DATA} type should be dict") rule_id = cls.META_DATA return cls(version, type, rule_id, meta_data) class AssociationPropertyRule(Rule): def __init__(self, version, type, rule_id, association_property): super().__init__(version, type, rule_id) self.association_property = association_property @classmethod def initialize_from_info(cls, path, data, version, type): association_property = data.get(cls.ASSOCIATION_PROPERTY, {}) if not isinstance(association_property, dict): InvalidRuleSchema( path=path, reason=f"{cls.ASSOCIATION_PROPERTY} type should be dict" ) rule_id = cls.ASSOCIATION_PROPERTY return cls(version, type, rule_id, association_property)