packages/python-packages/protocol-stub-generator/protocol/protocol_models.py (1,215 lines of code) (raw):
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
import re
import sys
from autorest_vendor.autorest.codegen.models import (
RequestBuilder,
CodeModel,
build_schema,
Operation,
)
from ._token import Token
from ._token_kind import TokenKind
from typing import Any, Dict
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
JSON_FIELDS = [
"Name",
"Version",
"VersionString",
"Navigation",
"Tokens",
"Diagnostics",
"PackageName",
]
PARAM_FIELDS = ["name", "type", "default", "optional", "indent"]
OP_FIELDS = ["operation", "parameters", "indent"]
R_TYPE = [
"dictionary",
"string",
"bool",
"int32",
"int64",
"float32",
"float64",
"binary",
]
class FormattingClass:
def add_whitespace(self, indent):
if indent:
self.add_token(Token(" " * (indent * 4)))
def add_space(self):
self.add_token(Token(" ", TokenKind.Whitespace))
def add_new_line(self, additional_line_count=0):
self.add_token(Token("", TokenKind.Newline))
for n in range(additional_line_count):
self.add_space()
self.add_token(Token("", TokenKind.Newline))
def add_punctuation(self, value, prefix_space=False, postfix_space=False):
if prefix_space:
self.add_space()
self.add_token(Token(value, TokenKind.Punctuation))
if postfix_space:
self.add_space()
def add_line_marker(self, text):
token = Token("", TokenKind.LineIdMarker)
token.set_definition_id(text)
self.add_token(token)
def add_text(self, id, text, nav):
token = Token(text, TokenKind.Text)
token.DefinitionId = id
token.NavigateToId = nav
self.add_token(token)
def add_comment(self, id, text, nav):
token = Token(text, TokenKind.Comment)
token.DefinitionId = id
token.NavigateToId = nav
self.add_token(token)
def add_typename(self, id, text, nav):
token = Token(text, TokenKind.TypeName)
token.DefinitionId = id
token.NavigateToId = nav
self.add_token(token)
def add_stringliteral(self, id, text, nav):
token = Token(text, TokenKind.StringLiteral)
token.DefinitionId = id
token.NavigateToId = nav
self.add_token(token)
def add_literal(self, id, text):
token = Token(text, TokenKind.Literal)
token.DefinitionId = id
self.add_token(token)
def add_keyword(self, id, keyword, nav):
token = Token(keyword, TokenKind.Keyword)
token.DefinitionId = id
token.NavigateToId = nav
self.add_token(token)
def add_navigation(self, navigation):
self.Navigation.append(navigation)
class ProtocolClientView(FormattingClass):
"""Entity class that holds LLC view for all namespaces within a package"""
def __init__(
self,
operation_groups,
pkg_name="",
endpoint="endpoint",
endpoint_type="string",
credential="Credential",
credential_type="AzureCredential",
):
self.Name = pkg_name
self.Language = "Protocol"
self.Tokens = []
self.Operations = []
self.Operation_Groups = operation_groups
self.Navigation = []
self.Diagnostics = []
self.endpoint_type = endpoint_type
self.endpoint = endpoint
self.credential = credential
self.credential_type = credential_type
self.namespace = "Azure." + pkg_name
@classmethod
def from_yaml(cls, yaml_data: Dict[str, Any]):
operation_groups = []
# Iterate through Operations in OperationGroups
for op_groups in range(0, len(yaml_data["operationGroups"])):
operation_group = ProtocolOperationGroupView.from_yaml(
yaml_data, op_groups, "Azure." + yaml_data["info"]["title"]
)
if operation_group.operation_group == "":
operation_group.operation_group = "<default>"
operation_groups.insert(0, operation_group)
else:
operation_groups.append(operation_group)
return cls(
operation_groups=operation_groups,
pkg_name=yaml_data["info"]["title"],
endpoint=yaml_data["globalParameters"][0]["language"]["default"]["name"],
endpoint_type=yaml_data["globalParameters"][0]["schema"]["type"],
)
def add_token(self, token):
self.Tokens.append(token)
def add_operation_group(self, operation_group):
self.Operation_Groups.append(operation_group)
def to_token(self):
# Create view
# Namespace
self.add_keyword(self.namespace, self.namespace, self.namespace)
self.add_space()
self.add_punctuation("{")
self.add_new_line(1)
# Name of client
self.add_whitespace(1)
self.add_keyword(
self.namespace + self.Name, self.Name, self.namespace + self.Name
)
self.add_punctuation("(")
self.add_stringliteral(None, self.endpoint_type, None)
self.add_space()
self.add_text(None, self.endpoint, None)
self.add_punctuation(",")
self.add_space()
self.add_stringliteral(None, self.credential_type, None)
self.add_space()
self.add_text(None, self.credential, None)
self.add_punctuation(")")
self.add_new_line(1)
# Create Overview
navigation = Navigation(None, None)
navigation.set_tag(NavigationTag(Kind.type_package))
overview = Navigation("Overview", "overview")
overview.set_tag(NavigationTag(Kind.type_package))
self.add_typename(
"overview",
"Overview ######################################################################",
"overview",
)
self.add_new_line()
for operation_group in self.Operation_Groups:
child_nav3 = Navigation(
operation_group.operation_group,
self.namespace + operation_group.operation_group + "overview",
)
child_nav3.set_tag(NavigationTag(Kind.type_class))
overview.add_child(child_nav3)
operation_group.to_token()
operation_tokens = operation_group.overview_tokens
for token in operation_tokens:
if token:
self.add_token(token)
for operation_view in operation_group.operations:
child_nav2 = Navigation(
operation_view.operation,
self.namespace
+ operation_group.operation_group
+ operation_view.operation
+ "overview",
)
child_nav2.set_tag(NavigationTag(Kind.type_method))
child_nav3.add_child(child_nav2)
self.add_new_line(1)
self.add_typename(
"details",
"Details ######################################################################",
"details",
)
self.add_new_line()
details = self.to_child_tokens()
self.add_new_line()
self.add_punctuation("}")
navigation.add_child(overview)
navigation.add_child(details)
self.add_navigation(navigation)
return self.Tokens
def to_child_tokens(self):
# Set Navigation
details = Navigation("Details", "details")
details.set_tag(NavigationTag(Kind.type_package))
self.add_new_line()
for operation_group_view in self.Operation_Groups:
# Add children
child_nav1 = Navigation(
operation_group_view.operation_group,
self.namespace + operation_group_view.operation_group,
)
child_nav1.set_tag(NavigationTag(Kind.type_class))
details.add_child(child_nav1)
op_group = operation_group_view.get_tokens()
for token in op_group:
self.add_token(token)
# Set up operations and add to token
for operation_view in operation_group_view.operations:
# Add operation comments
child_nav = Navigation(
operation_view.operation,
self.namespace
+ operation_group_view.operation_group
+ operation_view.operation,
)
child_nav.set_tag(NavigationTag(Kind.type_method))
child_nav1.add_child(child_nav)
return details
def to_json(self):
obj_dict = {}
self.to_token()
for key in JSON_FIELDS:
if key in self.__dict__:
obj_dict[key] = self.__dict__[key]
for i in range(0, len(obj_dict["Tokens"])):
# Break down token objects into dictionary
if obj_dict["Tokens"][i]:
obj_dict["Tokens"][i] = {
"Kind": obj_dict["Tokens"][i].Kind.value,
"Value": obj_dict["Tokens"][i].Value,
"NavigateToId": obj_dict["Tokens"][i].NavigateToId,
"DefinitionId": obj_dict["Tokens"][i].DefinitionId,
}
# Remove Null Values from Tokens
obj_dict["Tokens"][i] = {
key: value
for key, value in obj_dict["Tokens"][i].items()
if value is not None
}
obj_dict["Language"] = self.Language
return obj_dict
class ProtocolOperationGroupView(FormattingClass):
def __init__(self, operation_group_name, operations, namespace):
self.operation_group = operation_group_name
self.operations = operations
self.Tokens = []
self.overview_tokens = []
self.namespace = namespace
@classmethod
def from_yaml(cls, yaml_data: Dict[str, Any], op_group, name):
operations = []
for i in range(0, len(yaml_data["operationGroups"][op_group]["operations"])):
operations.append(
ProtocolOperationView.from_yaml(yaml_data, op_group, i, name)
)
return cls(
operation_group_name=yaml_data["operationGroups"][op_group]["language"][
"default"
]["name"],
operations=operations,
namespace=name,
)
def get_tokens(self):
return self.Tokens
def add_token(self, token):
self.Tokens.append(token)
# have a to_token to create the line for parameters
def to_token(self):
# Each operation will indent itself by 4
self.add_new_line()
if self.operation_group:
self.add_whitespace(1)
self.overview_tokens.append(Token(" " * 4, TokenKind.Whitespace))
# Operation Name token
self.add_text(None, "OperationGroup", None)
self.overview_tokens.append(Token("OperationGroup", TokenKind.Text))
self.add_space()
self.overview_tokens.append(Token(" ", TokenKind.Text))
self.add_keyword(
self.namespace + self.operation_group,
self.operation_group,
self.namespace + self.operation_group,
)
token = Token(self.operation_group, TokenKind.Keyword)
token.set_navigation_id(self.namespace + self.operation_group + "overview")
token.set_definition_id(self.namespace + self.operation_group + "overview")
self.overview_tokens.append(token)
self.add_new_line()
self.overview_tokens.append(Token("", TokenKind.Newline))
for operation in range(0, len(self.operations)):
if self.operations[operation]:
self.operations[operation].to_token()
if operation == 0:
self.add_whitespace(2)
self.overview_tokens.append(Token(" " * (4), TokenKind.Text))
self.add_punctuation("{")
self.add_new_line()
self.overview_tokens.append(Token("", TokenKind.Newline))
self.add_whitespace(2)
for i in self.operations[operation].overview_tokens:
self.overview_tokens.append(i)
for t in self.operations[operation].get_tokens():
self.add_token(t)
self.add_whitespace(2)
self.add_punctuation("}")
self.add_new_line(1)
self.overview_tokens.append(Token(" ", TokenKind.Whitespace))
self.overview_tokens.append(Token("", TokenKind.Newline))
else:
for operation in range(0, len(self.operations)):
if self.operations[operation]:
self.operations[operation].to_token()
for i in self.operations[operation].overview_tokens:
self.overview_tokens.append(i)
for t in self.operations[operation].get_tokens():
self.add_token(t)
def to_json(self):
obj_dict = {}
self.to_token()
for key in OP_FIELDS:
obj_dict[key] = self.__dict__[key]
return obj_dict
def get_description(yaml_data, op_group_num, op_num):
description = yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"language"
]["default"].get("summary")
if description is None:
description = yaml_data["operationGroups"][op_group_num]["operations"][
op_num
]["language"]["default"]["description"]
return description
def get_models(
yaml_data,
op_group_num,
op_num,
namespace,
param,
code,
request_builder,
response_builder,
):
json_request = {}
json_response = {}
for j in range(
0,
len(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"requests"
]
),
):
for i in range(
0,
len(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"requests"
][j].get("signatureParameters", [])
),
):
param.append(
ProtocolParameterView.from_yaml(
yaml_data["operationGroups"][op_group_num]["operations"][
op_num
]["requests"][j],
i,
namespace,
)
)
if (
build_schema(
yaml_data=request_builder.parameters.json_body, code_model=code
).serialization_type
!= "IO"
):
json_request = build_schema(
yaml_data=request_builder.parameters.json_body, code_model=code
).get_json_template_representation()
for i in response_builder.responses:
if i.schema:
if isinstance(i.schema, dict):
if (
build_schema(
yaml_data=i.schema, code_model=code
).serialization_type
!= "IO"
):
json_response = build_schema(
yaml_data=i.schema, code_model=code
).get_json_template_representation()
else:
json_response = i.schema.get_json_template_representation(
code_model=code
)
for i in range(
0,
len(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"signatureParameters"
]
),
):
param.append(
ProtocolParameterView.from_yaml(
yaml_data["operationGroups"][op_group_num]["operations"][op_num],
i,
namespace,
)
)
return json_request, json_response
def get_status_codes(yaml_data, op_group_num, op_num, status_codes):
for i in range(
0,
len(
yaml_data["operationGroups"][op_group_num]["operations"][op_num].get(
"responses"
)
),
):
status_codes.append(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"responses"
][i]["protocol"]["http"]["statusCodes"]
)
if yaml_data["operationGroups"][op_group_num]["operations"][op_num].get(
"exceptions", []
):
status_codes.append("/")
for i in range(
0,
len(
yaml_data["operationGroups"][op_group_num]["operations"][op_num].get(
"exceptions", []
)
),
):
status_codes.append(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"exceptions"
][i]["protocol"]["http"]["statusCodes"]
)
class ProtocolOperationView(FormattingClass):
def __init__(
self,
operation_group,
operation_name,
return_type,
parameters,
namespace,
json_request=None,
json_response=None,
status_codes=None,
description="",
paging=False,
lro=False,
yaml=None,
):
self.operation_group = operation_group
self.operation = operation_name
self.return_type = return_type
self.parameters = parameters # parameterview list
self.Tokens = []
self.overview_tokens = []
self.namespace = namespace
self.description = description
self.paging = paging
self.lro = lro
self.json_request = json_request
self.json_response = json_response
self.status_codes = status_codes
self.yaml = yaml
self.inner_model = []
@classmethod
def from_yaml(cls, yaml_data: Dict[str, Any], op_group_num, op_num, namespace):
param = []
pageable = None
lro = None
status_codes = []
code = CodeModel(
options={"show_models":True,"show_send_request":True,"builders_visibility":True},
)
request_builder = RequestBuilder.from_yaml(
yaml_data["operationGroups"][op_group_num]["operations"][op_num],
code_model=code,
)
response_builder = Operation.from_yaml(
yaml_data["operationGroups"][op_group_num]["operations"][op_num]
)
get_status_codes(yaml_data, op_group_num, op_num, status_codes)
if yaml_data["operationGroups"][op_group_num]["operations"][op_num].get(
"extensions"
):
pageable = yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"extensions"
].get("x-ms-pageable")
lro = yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"extensions"
].get("x-ms-long-running-operation")
paging_op = True if pageable else False
lro_op = True if lro else False
return_type = get_type(
yaml_data["operationGroups"][op_group_num]["operations"][op_num][
"responses"
][0].get("schema", []),
paging_op,
)
json_request, json_response = get_models(
yaml_data,
op_group_num,
op_num,
namespace,
param,
code,
request_builder,
response_builder,
)
description = get_description(yaml_data, op_group_num, op_num)
return cls(
operation_group=yaml_data["operationGroups"][op_group_num]["language"][
"default"
]["name"],
operation_name=yaml_data["operationGroups"][op_group_num]["operations"][
op_num
]["language"]["default"]["name"],
parameters=param,
return_type=return_type,
namespace=namespace,
description=description,
paging=paging_op,
lro=lro_op,
json_request=json_request,
json_response=json_response,
status_codes=status_codes,
yaml=yaml_data["operationGroups"][op_group_num]["operations"][op_num],
)
def get_tokens(self):
return self.Tokens
def add_token(self, token):
self.Tokens.append(token)
def add_first_line(self):
if self.paging and self.lro:
self.overview_tokens.append(Token("PagingLro", TokenKind.Text))
self.overview_tokens.append(Token("[", TokenKind.Text))
self.add_text(None, "PagingLro", None)
self.add_text(None, "[", None)
if self.paging:
self.overview_tokens.append(Token("Paging", TokenKind.Text))
self.overview_tokens.append(Token("[", TokenKind.Text))
self.add_text(None, "Paging", None)
self.add_text(None, "[", None)
if self.lro:
self.overview_tokens.append(Token("lro", TokenKind.Text))
self.overview_tokens.append(Token("[", TokenKind.Text))
self.add_text(None, "lro", None)
self.add_text(None, "[", None)
if self.return_type is None:
self.overview_tokens.append(Token("void", TokenKind.Text))
self.add_text(None, "void", None)
elif any(i in self.return_type for i in R_TYPE):
self.add_text(None, self.return_type, None)
self.overview_tokens.append(Token(self.return_type, TokenKind.Text))
else:
self.add_stringliteral(None, self.return_type, None)
self.overview_tokens.append(
Token(self.return_type, TokenKind.StringLiteral)
)
if self.paging or self.lro:
self.add_text(None, "]", None)
self.overview_tokens.append(Token("]", TokenKind.Text))
self.add_space()
if not self.operation_group:
self.operation_group = "<default>"
self.overview_tokens.append(Token(" ", TokenKind.Text))
token = Token(self.operation, TokenKind.Keyword)
token.set_definition_id(
self.namespace + self.operation_group + self.operation + "overview"
)
token.set_navigation_id(
self.namespace + self.operation_group + self.operation + "overview"
)
self.overview_tokens.append(token)
self.add_keyword(
self.namespace + self.operation_group + self.operation,
self.operation,
self.namespace + self.operation_group + self.operation,
)
self.add_space()
self.add_new_line()
self.add_description()
self.add_whitespace(3)
self.overview_tokens.append(Token("(", TokenKind.Text))
self.add_punctuation("(")
def add_description(self):
self.add_token(Token(kind=TokenKind.StartDocGroup))
self.add_whitespace(3)
self.add_typename(None, self.description, None)
self.add_new_line()
self.add_token(Token(kind=TokenKind.EndDocGroup))
def to_token(self):
# Remove None Param
self.parameters = [key for key in self.parameters if key.type]
# Create Overview:
# Each operation will indent itself by 4
self.add_whitespace(1)
self.overview_tokens.append(Token(" " * 4, TokenKind.Whitespace))
# Set up operation parameters
if len(self.parameters) == 0:
self.add_first_line()
self.add_new_line()
self.add_whitespace(3)
self.overview_tokens.append(Token(")", TokenKind.Text))
self.add_punctuation(")")
self.add_new_line()
self.add_token(Token(kind=TokenKind.StartDocGroup))
self.format_status_code()
self.add_token(Token(kind=TokenKind.EndDocGroup))
self.add_new_line(1)
for param_num in range(0, len(self.parameters)):
if self.parameters[param_num]:
self.parameters[param_num].to_token()
if param_num == 0:
self.add_first_line()
self.add_new_line()
# Add in parameter tokens
if self.parameters[param_num]:
self.add_whitespace(4)
for p in self.parameters[param_num].get_tokens():
self.add_token(p)
for o in self.parameters[param_num].overview_tokens:
self.overview_tokens.append(o)
# Add in comma before the next parameter
if param_num + 1 in range(0, len(self.parameters)):
self.add_punctuation(",")
self.overview_tokens.append(Token(", ", TokenKind.Text))
# Create a new line for the next operation
else:
self.add_new_line()
self.add_whitespace(3)
self.overview_tokens.append(Token(")", TokenKind.Text))
self.add_punctuation(")")
self.add_new_line(1)
self.add_token(Token(kind=TokenKind.StartDocGroup))
self.format_status_code()
# If the request or response contain more than one entry, check the parameters of the operation and see if one of them is an object, if it is, that is your overall model request name
# if type is not a normal type .. aka an object name ^
if self.json_request:
self.add_whitespace(3)
self.add_typename(None, "Request", None)
self.add_new_line(1)
object_name = None
new_req = {}
if not any(j in self.parameters[0].type for j in R_TYPE):
object_name = self.parameters[0].type
if object_name:
object_name = object_name.replace("[]", "")
new_req[object_name] = self.json_request
self.json_request = new_req
self.request_builder(self.json_request, self.yaml, not_first=False)
self.add_new_line()
self.add_whitespace(4)
for m in self.inner_model:
if m:
if m.Value == "str":
m.Value = "string"
self.Tokens.append(m)
self.add_new_line(1)
# If the response is just a string or simple list, ignore writing the response
if isinstance(self.json_response, str) or (
isinstance(self.json_response, list)
and isinstance(self.json_response[0], str)
):
self.json_response = None
if self.json_response:
self.inner_model = []
self.add_whitespace(3)
self.add_typename(None, "Response", None)
self.add_new_line(1)
new_req = {}
if not any(j in self.return_type for j in R_TYPE):
new_req[self.return_type.replace("[]", "")] = self.json_response
self.json_response = new_req
self.request_builder(
self.json_response, self.yaml, not_first=False
)
self.add_new_line()
self.add_whitespace(4)
for i in self.inner_model:
if i:
if i.Value == "str":
i.Value = "string"
self.Tokens.append(i)
self.add_new_line(1)
self.add_token(Token(kind=TokenKind.EndDocGroup))
def format_status_code(self):
if self.status_codes:
self.add_whitespace(3)
self.add_typename(None, "Status Codes", None)
self.add_space()
for i in self.status_codes:
if isinstance(i, list):
for j in i:
self.add_text(None, j, None)
self.add_text(None, " ", None)
else:
self.add_text(None, i, None)
self.add_text(None, " ", None)
self.add_new_line(1)
def to_json(self):
obj_dict = {}
self.to_token()
for key in OP_FIELDS:
obj_dict[key] = self.__dict__[key]
return obj_dict
def request_builder(
self, json_request, yaml, not_first, indent=4, name="", inner_model=[], no_list=True
):
if inner_model and not self.inner_model: self.inner_model = inner_model
no_list = self.list_format(json_request, yaml, indent, name, inner_model)
self.dict_format(json_request, yaml, not_first, indent, inner_model, no_list,name)
def dict_format(self, json_request, yaml, not_first, indent, inner_model, no_list,name):
if isinstance(json_request, dict):
for i in json_request:
if indent == 4:
self.add_whitespace(indent)
if not_first:
self.add_new_line()
self.add_whitespace(indent)
self.add_new_line()
self.add_whitespace(indent)
if not inner_model:
self.add_comment(None, "model " + i + " {", None)
self.add_new_line()
not_first = True
name = i
inner_model.clear()
else:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(Token(" " * (indent * 4), TokenKind.Whitespace))
inner_model.append(Token("model " + i + " {", TokenKind.Comment))
inner_model.append(Token(" ", TokenKind.Newline))
if indent > 4 and (
isinstance(json_request[i], list) or isinstance(json_request[i], dict)
):
if i == "str":
if inner_model:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
m_type, key = get_map_type(yaml, name)
m_type = format_map_type(m_type)
inner_model.append(
Token(
key + ": Map<string, " + m_type + ">;",
TokenKind.Comment,
)
)
else:
self.add_new_line()
self.add_whitespace(indent)
m_type, key = get_map_type(yaml, name)
m_type = format_map_type(m_type)
self.add_comment(
None, key + ": Map<string, " + m_type + ">;", None
)
self.add_new_line()
self.add_whitespace(4)
self.add_comment(None, "};", None)
# START COLLECTING INNER MODEL DATA
if not m_type == "{"+"}":
indent = 4
if "[]" in m_type:
m_type = m_type[0 : len(m_type) - 2]
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(
Token(
"model " + m_type + " {",
TokenKind.Comment,
)
)
else:
if inner_model:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
if isinstance(json_request[i], list) and not isinstance(
json_request[i][0], dict
):
inner_model.append(Token(i, TokenKind.Comment))
else:
inner_model.append(
Token(i + ": {", TokenKind.Comment)
)
name = i
else:
self.add_new_line()
if not (isinstance(json_request[i], list) and not isinstance(
json_request[i][0], dict)
):
self.add_whitespace(indent)
self.add_comment(None, i + ": {", None)
name = i
inner_model = []
if isinstance(json_request[i], str):
in_model = False
if indent == 4:
in_model = True
indent = indent + 1
name = i
if inner_model:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(Token(" " * (indent * 4), TokenKind.Whitespace))
else:
self.add_new_line()
self.add_whitespace(indent)
index = json_request[i].find("(optional)")
param = json_request[i].split()
if i == "str":
param[0] = format_param_type(param[0])
m_type, key = get_map_type(yaml, name)
m_type = format_map_type(m_type)
if not key: key = self.parameters[0].name
if inner_model:
optional = ""
if index != -1:
optional = "?"
inner_model.append(
Token(
key + optional + " : Map<string, " + param[0] + ">;",
TokenKind.Comment,
)
)
else:
optional = ""
if index != -1:
optional = "?"
self.add_comment(
None, key + optional + " : Map<string, " + param[0] + ">;", None
)
if in_model:
self.add_new_line()
self.add_whitespace(4)
self.add_comment(None, "};", None)
in_model = False
indent = 4
else:
if len(param) >= 2:
param[0] = format_param_type(param[0])
optional = ""
if index != -1:
optional = "?"
json_request[i] = i + optional + " : " + param[0] + ";"
if inner_model:
inner_model.append(
Token(json_request[i], TokenKind.Comment)
)
else:
self.add_comment(None, json_request[i], None)
if in_model:
self.add_new_line()
self.add_whitespace(4)
self.add_comment(None, "};", None)
in_model = False
indent = 4
else:
json_request[i] = format_param_type(json_request[i])
if inner_model:
inner_model.append(
Token(
i + ":" + json_request[i] + ";", TokenKind.Comment
)
)
else:
self.add_comment(
None, i + ": " + json_request[i] + ";", None
)
if in_model:
self.add_new_line()
self.add_whitespace(4)
self.add_comment(None, "};", None)
in_model = False
indent = 4
else:
self.request_builder(
json_request[i],
yaml,
indent=indent + 1,
not_first=True,
name=name,
inner_model=inner_model,
no_list=no_list,
)
inner_model = self.format_punctuation(json_request, indent, inner_model, i)
def format_punctuation(self, json_request, indent, inner_model, i):
if i == "str":
return inner_model
elif inner_model and isinstance(json_request[i], list):
if isinstance(json_request[i], list) and len(json_request[i])>1:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(Token("}[];", TokenKind.Comment))
elif isinstance(json_request[i],list) and isinstance(json_request[i][0],dict):
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(Token("}[];", TokenKind.Comment))
elif len(json_request[i])>1:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(Token("};", TokenKind.Comment))
elif isinstance(json_request[i], list) and indent > 4:
if isinstance(json_request[i][0], dict) :
self.add_new_line()
self.add_whitespace(indent)
self.add_comment(None, "}[];", None)
if len(json_request[i])==1:
pass
else:
self.add_new_line()
self.add_whitespace(indent)
self.add_comment(None, "}[];", None)
else:
if indent == 5 and inner_model:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(Token("};", TokenKind.Comment))
if self.inner_model != inner_model:
self.inner_model +=inner_model
else: self.inner_model = inner_model
inner_model = []
elif inner_model:
inner_model.append(Token(" ", TokenKind.Newline))
inner_model.append(
Token(" " * (indent * 4), TokenKind.Whitespace)
)
inner_model.append(Token("};", TokenKind.Comment))
else:
self.add_new_line()
self.add_whitespace(indent)
self.add_comment(None, "};", None)
return inner_model
def list_format(self, json_request, yaml, indent, name, inner_model):
no_list = True
if isinstance(json_request, list):
for i in range(0, len(json_request)):
if isinstance(json_request[i], str):
index = json_request[i].find("(optional)")
param = json_request[i].split()
if len(param) >= 2:
param[0] = format_param_type(param[0])
optional = ""
if index != -1:
optional = "?"
json_request[i] = optional + " : " + param[0] + "[];"
if inner_model:
inner_model.append(Token(json_request[i], TokenKind.Comment))
inner_model.append(Token(" ", TokenKind.Newline))
else:
json_request[i] = format_param_type(json_request[i])
if name:
self.add_whitespace(indent-1)
self.add_comment(None, name + json_request[i], None)
else:
self.add_comment(None, json_request[i], None)
self.add_new_line()
else:
no_list = False
self.request_builder(
json_request[i],
yaml,
indent=indent + 1,
not_first=True,
inner_model=inner_model,
no_list=no_list,
)
return no_list
def format_param_type(p_type):
if p_type == "str":
p_type = "string"
if p_type == "any-object":
p_type = "{" + "}"
return p_type
def format_map_type(m_type):
if "dictionary" in m_type:
m_type = m_type.split()
m_type = m_type[1][: len(m_type[1]) - 1]
if m_type == "any-object":
m_type = "{" + "}"
return m_type
def get_map_type(yaml, name=""):
# Find yaml type
key = ""
m_type = ""
if name:
if yaml["requests"][0]["parameters"]:
for i in yaml["requests"][0]["parameters"]:
if i["schema"].get("properties", []):
for j in i["schema"]["properties"]:
if j["serializedName"] == name:
m_type = get_type(j["schema"]["elementType"])
key = j["schema"]["language"]["default"]["name"]
for j in i["schema"]["properties"][0]["schema"].get(
"properties", []
):
if j["serializedName"] == name:
m_type = get_type(j["schema"]["elementType"])
key = j["schema"]["language"]["default"]["name"]
for j in i["schema"]["properties"][0]["schema"].get(
"elementType", []
):
for k in i["schema"]["properties"][0]["schema"][
"elementType"
].get("properties", []):
if k["serializedName"] == name:
m_type = get_type(k["schema"]["elementType"])
key = k["schema"]["language"]["default"]["name"]
if i["schema"]["properties"][0].get("serializedName") == name:
m_type = get_type(
i["schema"]["properties"][0]["schema"]["elementType"]
)
key = i["schema"]["properties"][0]["schema"]["language"][
"default"
]["name"]
if i["schema"].get("elementType", []):
if i["schema"]["elementType"].get("properties", []):
for j in i["schema"]["elementType"].get("properties", []):
if j["serializedName"] == name:
m_type = get_type(j["schema"]["elementType"])
key = j["schema"]["language"]["default"]["name"]
for j in i["schema"]["elementType"]["properties"][0][
"schema"
].get("elementType", []):
for k in i["schema"]["elementType"]["properties"][0][
"schema"
]["elementType"].get("properties", []):
if k["serializedName"] == name:
m_type = get_type(k["schema"]["elementType"])
key = k["schema"]["language"]["default"]["name"]
if yaml["responses"][0].get("schema"):
for i in yaml["responses"][0]["schema"].get("properties", []):
if i["schema"].get("properties", []):
for j in i["schema"]["properties"][0]["schema"].get(
"properties", []
):
if j["serializedName"] == name:
m_type = get_type(j["schema"]["elementType"])
key = j["schema"]["language"]["default"]["name"]
for j in i["schema"]["properties"][0]["schema"].get(
"elementType", []
):
for k in i["schema"]["properties"][0]["schema"][
"elementType"
].get("properties", []):
if k["serializedName"] == name:
m_type = get_type(k["schema"]["elementType"])
key = k["schema"]["language"]["default"]["name"]
if i["serializedName"] == name:
m_type = get_type(i["schema"]["elementType"])
key = i["schema"]["language"]["default"]["name"]
if i["schema"].get("elementType", []):
for j in i["schema"]["elementType"].get("properties", []):
if j["serializedName"] == name:
m_type = get_type(j["schema"]["elementType"])
key = j["schema"]["language"]["default"]["name"]
return m_type, key
class ProtocolParameterView(FormattingClass):
def __init__(
self,
operation,
param_name,
param_type,
namespace,
json_request=None,
default=None,
required=False,
):
self.operation = operation
self.name = param_name
self.type = param_type
self.default = default
self.required = required
self.Tokens = []
self.overview_tokens = []
self.json_request = json_request
self.namespace = namespace
@classmethod
def from_yaml(cls, yaml_data: Dict[str, Any], i, name):
required = True
default = None
json_request = {}
if yaml_data.get("signatureParameters"):
default = yaml_data["signatureParameters"][i]["schema"].get("defaultValue")
param_name = yaml_data["signatureParameters"][i]["language"]["default"][
"name"
]
param_type = get_type(yaml_data["signatureParameters"][i]["schema"])
if yaml_data["signatureParameters"][i].get("required"):
required = yaml_data["signatureParameters"][i]["required"]
else:
required = False
else:
param_type = None
param_name = None
return cls(
operation=yaml_data["language"]["default"]["name"],
param_type=param_type,
param_name=param_name,
required=required,
namespace=name,
default=default,
json_request=json_request,
)
def add_token(self, token):
self.Tokens.append(token)
def get_tokens(self):
return self.Tokens
def to_token(self):
if self.type is not None:
# Create parameter type token
self.add_stringliteral(None, self.type, None)
self.overview_tokens.append(Token(self.type, TokenKind.StringLiteral))
# If parameter is optional, token for ? created
if not self.required:
self.add_stringliteral(None, "?", None)
self.overview_tokens.append(Token("?", TokenKind.StringLiteral))
self.add_space()
self.overview_tokens.append(Token(" ", TokenKind.Text))
# Create parameter name token
self.add_text(
self.namespace + self.operation + self.type + self.name + "details",
self.name,
None,
)
token = Token(self.name, TokenKind.Text)
token.set_definition_id(
self.namespace + self.operation + self.type + self.name + "overview"
)
self.overview_tokens.append(token)
# Check if parameter has a default value or not
if self.default is not None:
self.add_space()
self.overview_tokens.append(Token(" ", TokenKind.Text))
self.add_text(None, "=", None)
self.overview_tokens.append(Token("=", TokenKind.Text))
self.add_space()
self.overview_tokens.append(Token(" ", TokenKind.Text))
if self.type == "string":
self.add_text(None, "'" + str(self.default) + "'", None)
self.overview_tokens.append(
Token("'" + str(self.default) + "'", TokenKind.Text)
)
else:
self.add_text(None, str(self.default), None)
self.overview_tokens.append(
Token(str(self.default), TokenKind.Text)
)
def to_json(self):
obj_dict = {}
self.to_token()
for key in PARAM_FIELDS:
obj_dict[key] = self.__dict__[key]
return obj_dict
class Kind:
type_class = "class"
type_enum = "enum"
type_method = "method"
type_module = "namespace"
type_package = "assembly"
class NavigationTag:
def __init__(self, kind):
self.TypeKind = kind
class Navigation:
"""Navigation model to be added into tokens files. List of Navigation object represents the tree panel in tool"""
def __init__(self, text, nav_id):
self.Text = text
self.NavigationId = nav_id
self.ChildItems = []
self.Tags = None
def set_tag(self, tag):
self.Tags = tag
def add_child(self, child):
self.ChildItems.append(child)
def get_type(data, page=False):
# Get type
try:
return_type = data["type"]
if return_type == "choice":
return_type = data["choiceType"]["type"]
if return_type == "dictionary":
value = data["elementType"]["type"]
if value == "object" or value == "array" or value == "dictionary":
value = get_type(data["elementType"])
return_type += "[string, " + value + "]"
if return_type == "object":
return_type = data["language"]["default"]["name"]
if page:
for i in data['properties']:
if i.get('serializedName') == 'value':
return_type = get_type(i["schema"], True)
if return_type == "array":
if (
data["elementType"]["type"] != "object"
and data["elementType"]["type"] != "choice"
):
return_type = data["elementType"]["type"] + "[]"
elif not page:
return_type = data["elementType"]["language"]["default"]["name"] + "[]"
else:
return_type = data["elementType"]["language"]["default"]["name"]
if "number" in return_type:
if data["precision"] == 32:
return_type = re.sub("number", "float32", return_type)
if data["precision"] == 64:
return_type = re.sub("number", "float64", return_type)
if "integer" in return_type:
if data["precision"] == 32:
return_type = re.sub("integer", "int32", return_type)
if data["precision"] == 64:
return_type = re.sub("integer", "int64", return_type)
if return_type == "boolean":
return_type = "bool"
else:
return_type = return_type
except:
return_type = None
return return_type