tools/resource.py (324 lines of code) (raw):
import json
import os
import time
from subprocess import run
from tempfile import NamedTemporaryFile
import requests
import jsonref
from alibabacloud_ros20190910 import models as ros_models
from alibabacloud_ros20190910.client import Client as RosClient
from Tea.exceptions import TeaException
from tools import exceptions
class RosResource:
def __init__(self, client: RosClient, resource_type: str):
self.resource_type = resource_type
self.client = client
self._rt_info = None
def fetch(self):
req = ros_models.GetResourceTypeRequest(self.resource_type)
resp = None
for i in range(4):
try:
resp = self.client.get_resource_type(req)
break
except TeaException as e:
if e.code != 'Throttling.User' or i == 3:
raise e
time.sleep(10)
if resp:
self._rt_info = resp.body
@property
def rt_info(self) -> ros_models.GetResourceTypeResponseBody:
if not self._rt_info:
self.fetch()
return self._rt_info
def properties(self):
return self.rt_info.properties
def attributes(self):
return self.rt_info.attributes
class TerraformResource:
ALICLOUD_SRC_URL = "https://raw.githubusercontent.com/aliyun/terraform-provider-alicloud/master/alicloud/resource_{}.go"
AST_FUNC_RETURN = {
"NodeType": "StarExpr",
"X": {
"NodeType": "SelectorExpr",
"X": {"NodeType": "Ident", "Name": "schema"},
"Sel": {"NodeType": "Ident", "Name": "Resource"},
},
}
AST_RESOURCE_TYPE = {
"NodeType": "SelectorExpr",
"X": {"NodeType": "Ident", "Name": "schema"},
"Sel": {"NodeType": "Ident", "Name": "Resource"},
}
AST_SCHEMA_TYPE = {
"NodeType": "StarExpr",
"X": {
"NodeType": "SelectorExpr",
"X": {"NodeType": "Ident", "Name": "schema"},
"Sel": {"NodeType": "Ident", "Name": "Schema"},
},
}
AST_D_SET_FUNC = {
"NodeType": "SelectorExpr",
"X": {"NodeType": "Ident", "Name": "d"},
"Sel": {"NodeType": "Ident", "Name": "Set"},
}
TYPE_MAPPING = {
"TypeString": "string",
"TypeBool": "boolean",
"TypeSet": "list",
"TypeList": "list",
"TypeInt": "integer",
"TypeFloat": "number",
"TypeMap": "map",
}
KEYS_USING_DEFAULT_TAGS_SCHEMA = ("tags", "volume_tags", "template_tags")
DEFAULT_TAGS_SCHEMA = {
"Handler": "tags_dict_to_list"
# 'Type': 'list',
# 'Schema': {
# '*': {
# "Type": "map",
# "Required": False,
# "Schema": {
# "Value": {
# "Type": "string",
# "Required": False,
# },
# "Key": {
# "Type": "string",
# "Required": True,
# }
# },
# }
# }
}
def __init__(self, resource_type: str, resource_filename: str = None):
self.resource_type = resource_type
self.resource_filename = resource_filename or resource_type
self._ast = None
def path_or_url(self):
alicloud_local = os.getenv("TERRAFORM_PROVIDER_ALICLOUD")
if alicloud_local:
path = os.path.join(
alicloud_local, "alicloud", f"resource_{self.resource_filename}.go"
)
return path, "path"
url = self.ALICLOUD_SRC_URL.format(self.resource_filename)
return url, "url"
def fetch(self):
path_or_url, t = self.path_or_url()
if t == "path":
with open(path_or_url) as f:
code = f.read().encode("utf-8")
else:
resp = requests.get(path_or_url)
if resp.status_code != 200:
raise exceptions.RosToolWarning(
message=f"Cannot fetch {path_or_url}. Reason: {resp.text}"
)
code = resp.text.encode("utf-8")
with NamedTemporaryFile() as f:
p = run("asty go2json", input=code, shell=True, stdout=f)
if p.returncode != 0:
raise exceptions.RosToolWarning(
message=f'Run `{p.args}` failed. Reason: {p.stderr.decode("utf-8")}'
)
f.seek(0)
self._ast = json.loads(f.read())
@property
def ast(self):
if not self._ast:
self.fetch()
return self._ast
def properties(self):
ast_resource = self._find_ast_resource(self.ast)
ast_schema = self._find_ast_schema(ast_resource)
properties = self._get_properties_schema(ast_schema)
return properties
def attributes(self):
decls = self.ast.get("Decls")
if not decls:
return
attributes = {}
for decl in decls:
if decl["NodeType"] != "FuncDecl":
continue
decl_type = decl["Type"]
if decl_type["NodeType"] != "FuncType":
continue
body = decl["Body"]
if body["NodeType"] != "BlockStmt":
continue
self._fetch_attributes(body["List"], attributes)
return attributes
@classmethod
def _fetch_attributes(cls, block, attributes):
if isinstance(block, dict):
x = block.get("X")
if x and x.get("Fun") == cls.AST_D_SET_FUNC:
args = x["Args"]
if args and args[0]["Kind"] == "STRING":
attr_name = args[0]["Value"].strip('"')
attributes[attr_name] = {}
else:
for val in block.values():
cls._fetch_attributes(val, attributes)
elif isinstance(block, list):
for val in block:
cls._fetch_attributes(val, attributes)
@classmethod
def _find_ast_resource(cls, ast):
decls = ast.get("Decls")
if not decls:
return
for decl in decls:
if decl["NodeType"] != "FuncDecl":
continue
decl_type = decl["Type"]
if decl_type["NodeType"] != "FuncType":
continue
if decl_type["Results"]["NodeType"] != "FieldList":
continue
for result in decl_type["Results"]["List"]:
if result["Type"] != cls.AST_FUNC_RETURN:
continue
for block in decl["Body"]["List"]:
if block["NodeType"] != "ReturnStmt":
continue
for result in block["Results"]:
x = result.get("X")
if x and x["Type"] == cls.AST_RESOURCE_TYPE:
return x
@classmethod
def _find_ast_schema(cls, ast_resource):
if not ast_resource:
return
for elt in ast_resource["Elts"]:
if elt["NodeType"] != "KeyValueExpr":
continue
elt_value = elt["Value"]
if (
elt_value["NodeType"] == "CompositeLit"
and elt_value["Type"]["Value"] == cls.AST_SCHEMA_TYPE
):
return elt_value
def _get_properties_schema(self, ast_schema, prop_path=""):
if not ast_schema:
return {}
properties_schema = {}
for elt in ast_schema["Elts"]:
prop_name = eval(elt["Key"]["Value"])
new_prop_path = f"{prop_path}.{prop_name}" if prop_path else prop_name
try:
prop = self._get_property_schema(elt["Value"], new_prop_path)
except ValueError as e:
if prop_name in self.KEYS_USING_DEFAULT_TAGS_SCHEMA:
prop = self.DEFAULT_TAGS_SCHEMA
else:
path_or_url, t = self.path_or_url()
raise ValueError(f"{e}. Path: {path_or_url}")
properties_schema[prop_name] = prop
return properties_schema
def _get_property_schema(self, ast_prop, path):
if ast_prop["NodeType"] == "CallExpr":
path_or_url, t = self.path_or_url()
raise ValueError(
f'Found function call in schema "{path}", not supported. Path: {path_or_url}'
)
prop = {"Required": True}
for elt in ast_prop["Elts"]:
elt_key_name = elt["Key"]["Name"]
elt_value = elt["Value"]
if elt_key_name == "Type":
prop["Type"] = self.TYPE_MAPPING[elt_value["Sel"]["Name"]]
elif elt_key_name == "Optional":
prop["Required"] = elt_value["Name"] == '"True"'
elif elt_key_name == "Elem":
node_type = elt_value["NodeType"]
if node_type == "UnaryExpr":
if elt_value["Op"] == "&":
x = elt_value["X"]
x_name = x["Type"]["Sel"]["Name"]
new_prop_path = f"{path}.*"
if x_name == "Schema":
sub_prop = self._get_property_schema(x, new_prop_path)
prop["Schema"] = {"*": sub_prop}
elif x_name == "Resource":
sub_props = self._get_properties_schema(
x["Elts"][0]["Value"], new_prop_path
)
prop["Schema"] = {
"*": {
"Required": False,
"Type": "map",
"Schema": sub_props,
}
}
return prop
class CloudFormationResource:
def __init__(self, client, resource_type: str):
self.client = client
self.resource_type = resource_type
self._rt_info = None
self._schema = None
def fetch(self):
resp = self.client.describe_type(
Type="RESOURCE",
TypeName=self.resource_type,
)
self._rt_info = resp
@property
def rt_info(self) -> dict:
if not self._rt_info:
self.fetch()
return self._rt_info
@property
def schema(self) -> dict:
if not self._schema:
s = self.rt_info["Schema"]
# remove resource-schema.json due to can not resolve
s = s.replace("resource-schema.json#", "#")
self._schema = jsonref.loads(s, proxies=False, lazy_load=False)
self._schema["type"] = "object"
self._handle_prop(self._schema)
return self._schema
def properties(self):
return self._get_props(read_only=False)
def attributes(self):
return self._get_props(read_only=True)
def _get_props(self, read_only=False):
properties = self.schema["Properties"]
read_only_properties = self.schema.get("ReadOnlyProperties")
if read_only_properties:
read_only_names = set(
n.replace("/properties/", "") for n in read_only_properties
)
else:
read_only_names = set()
if read_only:
props = {n: properties[n] for n in properties if n in read_only_names}
else:
props = {n: properties[n] for n in properties if n not in read_only_names}
return props
def _handle_props(self, props: dict, required_names: list = None):
for name, prop in props.items():
if required_names and name in required_names:
prop["Required"] = True
else:
prop["Required"] = False
self._handle_prop(prop)
def _handle_prop(self, prop: dict):
for k in list(prop):
if k == "required" and prop.get("type") == "object":
nk = ".RequiredList"
else:
nk = "{}{}".format(k[0].upper(), k[1:])
prop[nk] = prop.pop(k)
t = prop.get("Type")
if isinstance(t, list):
if "object" in t:
t = "object"
elif "array" in t:
t = "array"
else:
t = t[-1]
prop["Type"] = t
if t == "array":
prop["Type"] = "list"
item_schema = prop.get("Items")
self._handle_prop(item_schema)
prop["Schema"] = {"*": item_schema}
elif t == "object":
prop["Type"] = "map"
props = prop.get("Properties")
if props:
required_names = prop.pop(".RequiredList", None)
self._handle_props(props, required_names)
prop["Schema"] = props
elif not t and "oneof" in prop:
for each_prop in prop["oneof"]:
self._handle_prop(each_prop)