mrs_plugin/lib/sdk.py (1,299 lines of code) (raw):
# Copyright (c) 2022, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from typing import Literal, Optional
from mrs_plugin import lib
from pathlib import Path
import os
import re
from string import Template
import json
from base64 import b64decode
import mysqlsh
SDK_PYTHON_DATACLASS_TEMPLATE = '''@dataclass(init=False, repr=True)
class I{name}({mixins}MrsDocument[I{name}Data]
):
# For data attributes, `None` means "NULL" and
# `UndefinedField` means "not set or undefined"
{join_field_block}
def __init__( # type: ignore[override]
self, schema: MrsBaseSchema, data: I{name}Data
) -> None:
super().__init__(
schema,
data,
fields_map={field_profile},
obj_endpoint="{obj_endpoint}",
)
@classmethod
def get_primary_key_name(cls) -> Optional[str]:
return {primary_key_name}
'''
SDK_PYTHON_NON_NATIVE_TYPES = ("Date", "DateTime", "Time", "Year", "Vector")
class LanguageNotSupportedError(Exception):
def __init__(self, sdk_language):
self.message = f"The SDK language {sdk_language} is not supported yet."
def get_base_classes(sdk_language, prepare_for_runtime=False):
if sdk_language == "TypeScript":
file_name = "MrsBaseClasses.ts"
elif sdk_language == "Python":
file_name = "mrs_base_classes.py"
else:
raise LanguageNotSupportedError(sdk_language)
path = os.path.abspath(__file__)
code = Path(os.path.dirname(path), "..", "sdk", sdk_language.lower(),
file_name).read_text()
if prepare_for_runtime is True:
# Remove exports as everything will be in a single file
code = code.replace("export ", "")
# Remove the part that does not belong in the runtime SDK
delimiter = language_comment_delimiter(sdk_language)
code = re.sub(f"^\\s*?{delimiter} --- MySQL Shell for VS Code Extension Remove --- Begin.*?" +
f"^\\s*?{delimiter} --- MySQL Shell for VS Code Extension Remove --- End\n",
"", code, flags=re.DOTALL | re.MULTILINE)
code = remove_js_whitespace_and_comments(code)
return code
def generate_service_sdk(service, sdk_language, session, prepare_for_runtime=False, service_url=None):
# If no services is given, return only the MRS runtime management TypeScript Code
# that allows the user to manage specific MRS runtime settings
if service is None:
if prepare_for_runtime is True and sdk_language == "TypeScript":
return remove_js_whitespace_and_comments(get_mrs_runtime_management_code(session=session))
return ""
if sdk_language == "TypeScript":
file_name = "MrsServiceTemplate.ts.template"
elif sdk_language == "Python":
file_name = "mrs_service_template.py.template"
else:
raise LanguageNotSupportedError(sdk_language)
path = os.path.abspath(__file__)
template = Path(os.path.dirname(path), "..", "sdk", sdk_language.lower(),
file_name).read_text()
def binary_formatter(base64str: str):
if base64str.startswith("type254:"):
return b64decode(base64str.split(":")[-1])
# booleans are encoded as BIT(1)
if base64str.startswith("type16:"):
return bool.from_bytes(b64decode(base64str.split(":")[-1]))
return base64str
try:
sdk_data = lib.services.get_service_sdk_data(
session, service.get("id"), binary_formatter=binary_formatter
)
except mysqlsh.DBError as e:
if e.code != 1370:
raise e
# we should make a clear distinction for the case where we cannot retrieve
# sdk data because the user is missing "EXECUTE" permissions (BUG#)
sdk_data = None
# Process Template String
code = substitute_service_in_template(
service=service,
template=template,
sdk_language=sdk_language,
session=session,
service_url=service_url,
# service_data should be None if it couldn't be retrieved beforehand (BUG#)
# otherwise, if there are no services, it should be an empty dictionary
service_data=None if sdk_data is None else sdk_data.get("service_res", {}),
)
code = substitute_imports_in_template(
template=code.get("template"), enabled_crud_ops=code.get("enabled_crud_ops"),
required_datatypes=code.get("required_datatypes"), sdk_language=sdk_language,
requires_auth=code.get("requires_auth", False)
)
template = code.get("template")
delimiter = language_comment_delimiter(sdk_language)
if prepare_for_runtime is True:
# Remove imports as everything will be in a single file
template = re.sub('import.*?;', '', template,
flags=re.DOTALL | re.MULTILINE)
# Remove exports as everything will be in a single file
template = template.replace("export ", "")
# Remove the part that does not belong in the runtime SDK
template = re.sub(f"^[^\\S\r\n]*?{delimiter} --- MySQL Shell for VS Code Extension Remove --- Begin.*?" +
f"^\\s*?{delimiter} --- MySQL Shell for VS Code Extension Remove --- End\n",
"", template, flags=re.DOTALL | re.MULTILINE)
# Add MRS management code
template += get_mrs_runtime_management_code(session=session,
service_url=service_url)
template = remove_js_whitespace_and_comments(template)
else:
# Remove the part that does not belong in the generated SDK
template = re.sub(f"^[^\\S\r\n]*?{delimiter} --- MySQL Shell for VS Code Extension Only --- Begin.*?" +
f"^\\s*?{delimiter} --- MySQL Shell for VS Code Extension Only --- End\n",
"", template, flags=re.DOTALL | re.MULTILINE)
return template
def substitute_imports_in_template(
template,
sdk_language,
enabled_crud_ops: set[str] = set(),
required_datatypes: set[str] = set(),
requires_auth: bool = False,
):
delimiter = language_comment_delimiter(sdk_language)
import_loops = re.finditer(
f"^[^\\S\r\n]*?{delimiter} --- importLoopStart\n\\s*(^[\\S\\s]*?)^\\s*?{delimiter} --- importLoopEnd\n",
template,
flags=re.DOTALL | re.MULTILINE,
)
ops = [
"Create",
"Read",
"Update",
"Delete",
"ProcedureCall",
"ReadUnique",
"FunctionCall",
"Authenticate",
"TaskRun",
]
enabled_ops = enabled_crud_ops if enabled_crud_ops else set()
task_ops = {"FunctionTaskRun", "ProcedureTaskRun"}
if len(enabled_ops) - len(enabled_ops.difference(task_ops)) > 0:
enabled_ops = enabled_ops.difference(task_ops)
enabled_ops.add("TaskRun")
if requires_auth:
enabled_ops.add("Authenticate")
for loop in import_loops:
import_template = loop.group(1)
# if there are no required datatypes, we should remove the template tags
datatypes_block = ""
# otherwise we should replace them with the corresponding import block
if len(required_datatypes) > 0:
# tab size to be converted to spaces
indent = " " * 4
# each datatype should be comma separated and belongs in a new line
separator = f",\n{indent}"
# The first and last datatypes should also follow the same rules and they should be sorted alphabetically,
# mostly for testing purposes, but it is always good to be deterministic.
datatypes_block = f"{indent}{separator.join(sorted(required_datatypes))},\n"
import_template = re.sub(
f"^[^\\S\r\n]*?{delimiter} --- importRequiredDatatypesOnlyStart.*?"
+ f"^\\s*?{delimiter} --- importRequiredDatatypesOnlyEnd\n",
datatypes_block,
import_template,
flags=re.DOTALL | re.MULTILINE,
)
for op in ops:
# Find all "import{crud_op}OnlyStart / End" blocks
op_loops = re.finditer(
f"^[^\\S\r\n]*?{delimiter} --- import{op}OnlyStart\n\\s*(^[\\S\\s]*?)^\\s*?{delimiter} --- import{op}OnlyEnd\n",
import_template,
flags=re.DOTALL | re.MULTILINE,
)
for op_loop in op_loops:
# If the `CRUD + Auth` operation is enabled for any DB Object, keep the identified code block
if op in enabled_ops:
import_template = import_template.replace(
op_loop.group(), op_loop.group(1)
)
else:
# Delete the identified code block otherwise
# This cannot happen, because the block is gone for the next db object
import_template = import_template.replace(op_loop.group(), "")
template = template.replace(loop.group(), import_template)
return {
"template": template,
"enabled_crud_ops": enabled_crud_ops,
"required_datatypes": required_datatypes,
}
def substitute_service_in_template(service, template, sdk_language, session, service_url, service_data):
# Currently, we only generate the SDK for a single service, but this might change in the future.
existing_identifiers = []
code = substitute_schemas_in_template(
service=service,
template=template,
sdk_language=sdk_language,
session=session,
service_url=service_url,
schemas=None if service_data is None else service_data.get("db_schemas", []),
)
template = code.get("template")
requires_auth = code.get("requires_auth")
# if no db object requires auth and there are no authentication apps enabled for the service,
# the SDK authentication command should not be generated
delimiter = language_comment_delimiter(sdk_language)
auth_loops = re.finditer(
pattern=(
"^[^\\S\r\n]*?"
f"{delimiter} --- serviceAuthenticateStart"
"\n\\s*(^[\\S\\s]*?)^\\s*?"
f"{delimiter} --- serviceAuthenticateEnd\n"
),
string=template,
flags=re.DOTALL | re.MULTILINE
)
for auth_loop in auth_loops:
if requires_auth:
template = template.replace(auth_loop.group(), auth_loop.group(1))
else:
template = template.replace(auth_loop.group(), "")
service_id = service.get("id")
mapping = {
"service_name": generate_identifier(
value=service.get("url_context_root"),
sdk_language=sdk_language,
existing_identifiers=existing_identifiers,
),
"service_class_name": generate_identifier(
value=service.get("url_context_root"),
primitive="class",
existing_identifiers=existing_identifiers,
),
"service_url": service_url,
"service_auth_path": service.get("auth_path"),
"service_id": lib.core.convert_id_to_string(service_id),
}
template = Template(template).substitute(**mapping)
return {"template": template, "enabled_crud_ops": code.get("enabled_crud_ops"), "required_datatypes": code.get("required_datatypes"), "requires_auth": requires_auth}
def substitute_schemas_in_template(
service, template, sdk_language, session, service_url, schemas
):
delimiter = language_comment_delimiter(sdk_language)
schema_loops = re.finditer(
f"^[^\\S\r\n]*?{delimiter} --- schemaLoopStart\n\\s*(^[\\S\\s]*?)^\\s*?{delimiter} --- schemaLoopEnd\n",
template,
flags=re.DOTALL | re.MULTILINE,
)
# BUG#37926204 Get database schemas if they have not been retrieved beforehand
db_schemas = lib.schemas.query_schemas(session, service_id=service.get("id")) if schemas is None else schemas
enabled_crud_ops = set()
required_datatypes = set()
requires_auth = False
for loop in schema_loops:
# for each schema loop, we should restart tracking the identifiers
existing_identifiers = []
schema_template = loop.group(1)
filled_temp = ""
for schema in db_schemas:
# TODO: Implement support for MRS Scripts
if schema.get("schema_type") == "SCRIPT_MODULE":
continue
if requires_auth is False:
requires_auth |= schema.get("requires_auth") == 1
delimiter = language_comment_delimiter(sdk_language)
if f"{delimiter} --- objectLoopStart" in schema_template:
# Fill inner Object loops
code = substitute_objects_in_template(
service=service,
schema=schema,
template=schema_template,
sdk_language=sdk_language,
session=session,
service_url=service_url,
db_objs=None if schemas is None else schema.get("db_objects", [])
)
schema_template_with_obj_filled = code.get("template")
enabled_crud_ops.update(code.get("enabled_crud_ops"))
required_datatypes.update(code.get("required_datatypes"))
if requires_auth is False:
requires_auth |= code.get("requires_auth")
else:
schema_template_with_obj_filled = schema_template
mapping = {
"schema_name": generate_identifier(
value=schema.get("request_path"),
sdk_language=sdk_language,
existing_identifiers=existing_identifiers,
),
"schema_class_name": generate_identifier(
value=f"{service.get("url_context_root")}{schema.get("request_path")}",
primitive="class",
existing_identifiers=existing_identifiers,
),
"schema_request_path": schema.get("request_path"),
"schema_id": lib.core.convert_id_to_string(schema.get("id")),
}
filled_temp += Template(schema_template_with_obj_filled).substitute(
**mapping
)
template = template.replace(loop.group(), filled_temp)
return {
"template": template,
"enabled_crud_ops": enabled_crud_ops,
"required_datatypes": required_datatypes,
"requires_auth": requires_auth,
}
def get_mrs_object_sdk_language_options(sdk_options, sdk_language):
if not sdk_options or not sdk_options.get("language_options"):
return {}
# Find the matching language_options
for opt in sdk_options.get("language_options"):
if opt.get("language") == sdk_language:
return opt
return {}
def language_comment_delimiter(sdk_language):
if sdk_language == "TypeScript":
return "//"
if sdk_language == "Python":
return "#"
def generate_identifier(
value: str,
primitive: Literal["variable", "class"] = "variable",
sdk_language: Literal["TypeScript", "Python"] = "TypeScript",
# mutable objects for default values are re-used on subsequent function calls, let's leverage that
existing_identifiers: list[str] = [],
allowed_special_characters: Optional[set[str]] = None
) -> str:
if primitive == "class":
identifier = f"{lib.core.convert_path_to_pascal_case(value, allowed_special_characters)}"
elif sdk_language == "TypeScript":
identifier = f"{lib.core.convert_path_to_camel_case(value, allowed_special_characters)}"
elif sdk_language == "Python":
identifier = f"{lib.core.convert_to_snake_case(lib.core.convert_path_to_camel_case(value, allowed_special_characters))}"
else:
identifier = value
identifier = f"_{identifier}" if identifier[0].isdigit() else identifier
total_duplicates = existing_identifiers.count(identifier)
# we want to track the identifier with a potential prefix, but without the suffix
existing_identifiers.append(identifier)
# the total number of duplicates determines the suffix
identifier = (
identifier if total_duplicates == 0 else f"{identifier}{total_duplicates}"
)
return identifier
def substitute_objects_in_template(
service, schema, template, sdk_language, session, service_url, db_objs
):
delimiter = language_comment_delimiter(sdk_language)
object_loops = re.finditer(
f"^[^\\S\r\n]*?{delimiter} --- objectLoopStart\n\\s*(^[\\S\\s]*?)^\\s*?{delimiter} --- objectLoopEnd\n",
template,
flags=re.DOTALL | re.MULTILINE,
)
# BUG#37926204 Get database objects if they have not been retrieved beforehand
db_objects = lib.db_objects.query_db_objects(session, schema_id=schema.get("id")) if db_objs is None else db_objs
crud_ops = [
"Create",
"Read",
"Update",
"Delete",
"DeleteUnique",
"ProcedureCall",
"ReadUnique",
"FunctionCall",
"FunctionTaskRun",
"ProcedureTaskRun"
]
enabled_crud_ops = set()
required_datatypes = set()
requires_auth = False
schema_request_path = f"{service.get("url_context_root")}{schema.get("request_path")}"
schema_class_name = generate_identifier(
value=schema_request_path,
primitive="class",
existing_identifiers=[],
)
for loop in object_loops:
# for each object loop, we need to restart tracking the identifiers
existing_identifiers = []
filled_temp = ""
for db_obj in db_objects:
name = generate_identifier(
value=db_obj.get("request_path"),
sdk_language=sdk_language,
existing_identifiers=existing_identifiers,
)
class_name = ""
obj_class_name = generate_identifier(
value=f"{schema_request_path}{db_obj.get("request_path")}",
primitive="class",
existing_identifiers=existing_identifiers,
)
obj_interfaces = ""
obj_meta_interface = "I" + obj_class_name + "ResultSet"
obj_param_interface = ""
getters_setters = ""
obj_pk_list = []
obj_quoted_pk_list = []
obj_string_pk_list = []
obj_string_args_where_pk_list = []
obj_unique_list = []
obj_meta_interfaces = []
db_object_crud_ops = ""
# BUG#37926204 Get SDK objects if they have not been retrieved beforehand
objects = lib.db_objects.get_objects(
session, db_object_id=db_obj.get("id")) if db_obj.get("objects") is None else db_obj.get("objects")
# Loop over all objects and build interfaces
for obj in objects:
if requires_auth is False:
requires_auth |= db_obj.get("requires_auth") == 1
# BUG#37926204 Get object fields if they have not been retrieved beforehand
fields = lib.db_objects.get_object_fields_with_references(
session=session, object_id=obj.get("id")) if obj.get("fields") is None else obj.get("fields")
for field in fields:
if field.get("lev") == 1:
# Build Primary Key lists (only if "UPDATE" is allowed)
if field_is_pk(field) and "UPDATE" in db_obj.get("crud_operations", []):
obj_pk_list.append(field.get("name"))
obj_quoted_pk_list.append(f'"{field.get("name")}"')
obj_string_pk_list.append(
f'String({name}.{field.get("name")})')
obj_string_args_where_pk_list.append(
f'String(args.where.{field.get("name")})')
# Build Unique list
if field_is_unique(field):
obj_unique_list.append(field.get("name"))
# Build list containing the required types to import from MrsBaseClasses.ts
db_column_info = field.get("db_column")
if db_column_info:
db_datatype = db_column_info.get("datatype")
db_not_null = db_column_info.get("not_null")
# In TypeScript, there is no native type declaration for SomeType | null, so we add
# our own.
# In Python, we have Optional[SomeType] that does the trick, so there there is no
# need to add one.
if db_not_null is False and sdk_language == "TypeScript":
required_datatypes.add("MaybeNull")
client_datatype = get_enhanced_datatype_mapping(db_datatype, sdk_language)
if not datatype_is_primitive(client_datatype, sdk_language):
required_datatypes.add(client_datatype)
if (
sdk_language == "Python"
and client_datatype.startswith(
SDK_PYTHON_NON_NATIVE_TYPES
)
):
required_datatypes.add(
client_datatype.replace("Field", "")
)
# Get sdk_language specific options
sdk_lang_options = get_mrs_object_sdk_language_options(
obj.get("sdk_options"), sdk_language)
# Either take the custom interface_name or the default class_name
class_name = sdk_lang_options.get(
"class_name", obj_class_name)
# For database objects other than PROCEDUREs and FUNCTIONS, if there are unique fields,
# the corresponding SDK commands should be enabled.
if not object_is_routine(db_obj):
# READ is always enabled
db_object_crud_ops = db_obj.get("crud_operations", "READ")
# If db_objects results from calling lib.db_objects.get_objects, the value of the "crud_operations"
# key is already a list, if it results from calling lib.services.get_service_sdk_data, the value
# is a comma-separated string (BUG#37926204)
db_object_crud_ops = db_object_crud_ops if isinstance(db_object_crud_ops, list) else db_object_crud_ops.split(",")
# If this DB Object has unique columns (PK or UNIQUE) allow ReadUnique
if len(obj_unique_list) > 0 and "READUNIQUE" not in db_object_crud_ops:
db_object_crud_ops.append("READUNIQUE")
if len(obj_unique_list) > 0 and "DELETE" in db_object_crud_ops and "DELETEUNIQUE" not in db_object_crud_ops:
db_object_crud_ops.append("DELETEUNIQUE")
# If the database object is a FUNCTION a PROCEDURE or a SCRIPT, CRUD operations should not be enabled
elif object_is_routine(db_obj, of_type={"FUNCTION", "SCRIPT"}):
options = db_obj.get("options")
if options is not None and options.get("mysqlTask") is not None:
db_object_crud_ops = ["FUNCTIONTASKRUN"]
else:
db_object_crud_ops = ["FUNCTIONCALL"]
else:
required_datatypes.add("IMrsProcedureResult")
options = db_obj.get("options")
if options is not None and options.get("mysqlTask") is not None:
db_object_crud_ops = ["PROCEDURETASKRUN"]
else:
db_object_crud_ops = ["PROCEDURECALL"]
obj_interfaces_def, required_obj_datatypes = generate_interfaces(
db_obj,
obj,
fields,
class_name,
sdk_language,
db_object_crud_ops,
obj_endpoint=f"{service_url}{schema.get('request_path')}{db_obj.get("request_path")}",
)
required_datatypes.update(required_obj_datatypes)
# Do not add obj_interfaces for FUNCTION results
if obj.get("kind") == "PARAMETERS" or not object_is_routine(db_obj, of_type={"FUNCTION"}):
obj_interfaces += obj_interfaces_def
if obj.get("kind") == "PARAMETERS" and object_is_routine(db_obj):
obj_param_interface = f"{class_name}Params"
if obj.get("kind") != "PARAMETERS" and object_is_routine(db_obj):
obj_meta_interfaces.append(class_name)
# If the db object is a function, get the return datatype
obj_function_result_datatype = None
if object_is_routine(db_obj, of_type={"FUNCTION"}):
obj_function_result_datatype = "unknown"
if len(objects) > 1:
# BUG#37926204 Get object fields if they have not been retrieved beforehand
fields = lib.db_objects.get_object_fields_with_references(
session=session, object_id=objects[1].get("id")) if objects[1].get("fields") is None else objects[1].get("fields")
if len(fields) > 0:
db_column_info = field.get("db_column")
if db_column_info:
obj_function_result_datatype = get_datatype_mapping(
db_datatype=db_column_info.get("datatype"),
sdk_language=sdk_language)
# If there are no typed result sets for a Procedure, all the result sets will be generic instances of JsonObject
obj_procedure_result_set_datatype = None
if object_is_routine(db_obj, of_type={"PROCEDURE"}) and len(objects) == 1:
required_datatypes.add("JsonObject")
if sdk_language != "Python":
obj_interfaces += generate_union(
obj_meta_interface, ["JsonObject"], sdk_language
)
else:
obj_interfaces += generate_union(
obj_meta_interface,
["MrsProcedureResultSet[str, JsonObject, JsonObject]"],
sdk_language,
)
elif object_is_routine(db_obj, of_type={"PROCEDURE"}):
if sdk_language != "Python":
# TypeScript tagged unions inherit from JsonObject
required_datatypes.add("JsonObject")
interface_list = [f"ITagged{name}" for name in obj_meta_interfaces]
else:
interface_list = [
f"MrsProcedureResultSet[I{name}Type, I{name}, ITagged{name}]"
for name in obj_meta_interfaces
]
obj_procedure_result_set_datatype = (
"{"
+ ",".join(
[f'"{name}": I{name}' for name in obj_meta_interfaces]
)
+ "}"
)
obj_interfaces += generate_union(obj_meta_interface, interface_list, sdk_language)
# Define the mappings
mapping = {
"obj_id": lib.core.convert_id_to_string(db_obj.get("id")),
"obj_name": name,
"obj_class_name": class_name,
"obj_param_interface": obj_param_interface, # empty if not FUNCTION/PROCEDURE
"obj_meta_interface": obj_meta_interface,
"obj_request_path": db_obj.get("request_path"),
"schema_class_name": schema_class_name,
"schema_request_path": schema.get("request_path"),
"obj_full_request_path": service.get("url_context_root")
+ schema.get("request_path")
+ db_obj.get("request_path"),
"obj_type": db_obj.get("object_type"),
"obj_interfaces": obj_interfaces,
"obj_getters_setters": getters_setters,
"obj_pk_list": ", ".join(obj_pk_list),
"obj_quoted_pk_list": ", ".join(obj_quoted_pk_list),
"obj_string_pk_list": ", ".join(obj_string_pk_list),
"obj_string_args_where_pk_list": ", ".join(
obj_string_args_where_pk_list
),
"obj_function_result_datatype": obj_function_result_datatype,
"obj_procedure_result_set_datatype": obj_procedure_result_set_datatype,
}
# Loop over all CRUD operations and filter the sections that are not applicable for the specific object
obj_template = loop.group(1)
for crud_op in crud_ops:
# Find all crud{crud_op}OnlyStart / End control blocks
delimiter = language_comment_delimiter(sdk_language)
crud_op_loops = re.finditer(
f"^[^\\S\r\n]*?{delimiter} --- crud{crud_op}OnlyStart\n\\s*(^[\\S\\s]*?)^\\s*?{delimiter} --- crud{crud_op}OnlyEnd\n",
obj_template, flags=re.DOTALL | re.MULTILINE)
for crud_loop in crud_op_loops:
# If the CRUD operation is enabled for this DB Object, keep the identified code block
# if crud_op is "Update" and there is no primary key, update commands should not be available
if crud_op.upper() in db_object_crud_ops and (crud_op != "Update" or len(obj_pk_list) > 0):
enabled_crud_ops.add(crud_op)
obj_template = obj_template.replace(
crud_loop.group(), crud_loop.group(1))
else:
# Delete the identified code block otherwise
obj_template = obj_template.replace(
crud_loop.group(), "")
# Perform the substitution
filled_temp += Template(obj_template).substitute(**mapping)
template = template.replace(loop.group(), filled_temp)
return {"template": template, "enabled_crud_ops": enabled_crud_ops, "required_datatypes": required_datatypes, "requires_auth": requires_auth}
def get_datatype_mapping(db_datatype, sdk_language):
if db_datatype is None:
db_datatype = "text"
db_datatype = db_datatype.lower()
if sdk_language == "TypeScript":
if db_datatype.startswith(("tinyint(1)", "bit(1)")):
return "boolean"
if db_datatype.startswith(("tinyint", "smallint", "mediumint", "int", "bigint", "decimal", "numeric",
"float", "double")):
return "number"
if db_datatype.startswith("json"):
return "JsonValue"
if db_datatype.startswith("geometrycollection"):
return "GeometryCollection"
if db_datatype.startswith("geometry"):
return "Geometry"
if db_datatype.startswith("point"):
return "Point"
if db_datatype.startswith("multipoint"):
return "MultiPoint"
if db_datatype.startswith("linestring"):
return "LineString"
if db_datatype.startswith("multilinestring"):
return "MultiLineString"
if db_datatype.startswith("polygon"):
return "Polygon"
if db_datatype.startswith("multipolygon"):
return "MultiPolygon"
return "string"
if sdk_language == "Python":
if db_datatype.startswith(("tinyint(1)", "bit(1)")):
return "bool"
if db_datatype.startswith(("tinyint", "smallint", "mediumint", "int", "bigint")):
return "int"
if db_datatype.startswith(("decimal", "numeric", "float", "double")):
return "float"
if db_datatype.startswith("json"):
return "JsonValue"
if db_datatype.startswith("geometry"):
return "Geometry"
if db_datatype.startswith("point"):
return "Point"
if db_datatype.startswith("linestring"):
return "LineString"
if db_datatype.startswith("polygon"):
return "Polygon"
if db_datatype.startswith("multipoint"):
return "MultiPoint"
if db_datatype.startswith("multilinestring"):
return "MultiLineString"
if db_datatype.startswith("multipolygon"):
return "MultiPolygon"
if db_datatype.startswith("geomcollection"):
return "GeometryCollection"
if db_datatype.startswith(("datetime", "timestamp")):
return "DateTime"
if db_datatype.startswith("date"):
return "Date"
if db_datatype.startswith("time"):
return "Time"
if db_datatype.startswith("year"):
return "Year"
if db_datatype.startswith("vector"):
return "Vector"
return "str"
return "unknown"
def get_enhanced_datatype_mapping(db_datatype, sdk_language):
enhanced_map = {
"Python": {
"bool": "BoolField",
"int": "IntField",
"float": "FloatField",
"str": "StringField",
"Date": "DateField",
"DateTime": "DateTimeField",
"Time": "TimeField",
"Year": "YearField",
"Vector": "VectorField",
}
}
if sdk_language == "TypeScript":
# In TypeScript, the fields of type ${DatabaseObject} are the same as type ${DatabaseObject}Params
return get_datatype_mapping(db_datatype, sdk_language)
if sdk_language == "Python":
# If `py_datatype` not in the list of types to be enhanced,
# then we assume it shouldn't be enhanced and returned it as it is.
py_datatype = get_datatype_mapping(db_datatype, sdk_language)
return enhanced_map[sdk_language].get(py_datatype, py_datatype)
return "unknown"
def maybe_null(client_datatype, sdk_language):
if sdk_language == "TypeScript":
return f"MaybeNull<{client_datatype}>"
if sdk_language == "Python":
return f"Optional[{client_datatype}]"
return "unknown"
def get_procedure_datatype_mapping(sp_datatype, sdk_language):
if sdk_language == "TypeScript":
if sp_datatype == "BOOLEAN":
return "boolean"
if sp_datatype in ("NUMBER", "INT"):
return "number"
if sp_datatype == "JSON":
return "object"
return "string"
return "unknown"
def get_interface_datatype(
field,
sdk_language,
class_name="",
reference_class_name_postfix="",
enhanced_fields=False,
nullable=True,
):
db_column_info = field.get("db_column")
if db_column_info:
db_datatype = db_column_info.get("datatype")
db_not_null = db_column_info.get("not_null")
# Todo: Handle SDK Options
if not enhanced_fields:
client_datatype = get_datatype_mapping(db_datatype, sdk_language)
else:
client_datatype = get_enhanced_datatype_mapping(db_datatype, sdk_language)
if not nullable or db_not_null is True:
return client_datatype
return maybe_null(client_datatype, sdk_language)
class_name_postfix = generate_identifier(value=field.get("name"), primitive="class", existing_identifiers=[])
return f"I{class_name}{reference_class_name_postfix}{class_name_postfix}"
def datatype_is_primitive(client_datatype, sdk_language):
# for now, consider only the data types we actually are able to map into
if client_datatype is None:
return False
if sdk_language == "TypeScript":
if client_datatype.startswith(("boolean", "number", "string")):
return True
return False
if sdk_language == "Python":
if client_datatype.startswith(
("bool", "float", "int", "str", "list", "tuple", "dict")
):
return True
return False
return False
def field_is_pk(field):
if field.get("lev") == 1:
db_column_info = field.get("db_column")
if db_column_info and db_column_info.get("is_primary"):
return True
return False
def field_is_unique(field):
if field.get("lev") == 1:
db_column_info = field.get("db_column")
if db_column_info and (db_column_info.get("is_primary") or db_column_info.get("is_unique")):
return True
return False
def field_is_nullable(field):
if field.get("lev") != 1:
return False
db_column_info = field.get("db_column")
if db_column_info is None:
return False
if db_column_info.get("not_null"):
return False
return True
def field_has_row_ownership(field, obj):
field_id = field.get("id")
row_ownership_field_id = obj.get("row_ownership_field_id")
return row_ownership_field_id is not None and field_id == row_ownership_field_id
def field_is_required(field, obj):
if field.get("lev") != 1:
return False
db_column_info = field.get("db_column")
if db_column_info is None:
return False
# if a field can be NULL it is, by definition, optional
not_null = db_column_info.get("not_null")
# if a field is a primary key with AUTO_INCREMENt, it is optional
id_generation = db_column_info.get("id_generation")
# if a field has a default column value, it is optional
column_default = db_column_info.get("column_default")
# if a field is maps to row ownership column, it should also be optional
has_row_ownership = field_has_row_ownership(field, obj)
if not_null and id_generation is None and column_default is None and not has_row_ownership:
return True
return False
def field_can_be_cursor(field):
if field.get("lev") != 1:
return False
db_column_info = field.get("db_column")
if db_column_info is None:
return False
# the column can be used as a cursor if it is a unique and sequential
# e.g. if it is a timestamp or has an AUTO_INCREMENT constraint
id_generation = db_column_info.get("id_generation")
datatype = db_column_info.get("datatype")
if ((id_generation is not None and id_generation.startswith("auto_inc")) or
(datatype is not None and datatype.startswith("timestamp"))):
return True
return False
def field_is_sortable(field):
return field.get("allow_sorting", False)
def get_field_by_id(fields, identifier):
for field in fields:
if field.get("id") == identifier:
return field
return None
def get_reduced_field_interface_datatype(field, fields, sdk_language, class_name):
if field.get("represents_reference_id"):
obj_ref = field.get("object_reference")
# Check if the field should be reduced to the value of another field
ref_field_id = obj_ref.get("reduce_to_value_of_field_id")
if obj_ref and ref_field_id:
# Convert id to binary
ref_field_id = lib.core.id_to_binary(
ref_field_id, "reduce_to_value_of_field_id")
# Lookup the field to reduce to
ref_field = get_field_by_id(fields, ref_field_id)
if ref_field:
datatype = get_interface_datatype(
ref_field, sdk_language, class_name)
# If the reference mapping is "to_many", use an array
ref_mapping = obj_ref.get("reference_mapping")
is_array = "[]" if ref_mapping and ref_mapping.get(
"to_many") == True else ""
return datatype + is_array
return None
def generate_type_declaration(
name,
parents=[],
fields={},
sdk_language="TypeScript",
ignore_base_types=False,
non_mandatory_fields: set[str] = set(), # Users may or not specify them
requires_placeholder=False,
is_unpacked=False,
readonly_fields: set[str] = set(),
):
if len(fields) == 0:
if not requires_placeholder:
return ""
return generate_type_declaration_placeholder(name, sdk_language, is_unpacked)
if sdk_language == "TypeScript":
field_block = [
generate_type_declaration_field(
name,
value,
sdk_language,
non_mandatory=(name in non_mandatory_fields),
allowed_special_characters={"(", ")"},
readonly=name in readonly_fields,
)
for name, value in fields.items()
]
if len(parents) > 0:
# To avoid issues with optional fields, we always use type intersection to represent inheritance.
intersection_block = f" & {' & '.join(parents)}"
return (
f"export type I{name} = {{\n"
+ "".join(field_block)
+ f"}}{intersection_block};\n\n"
)
# To follow the internal convention, we use interfaces for every other case.
return f"export interface I{name} {{\n" + "".join(field_block) + "}\n\n"
if sdk_language == "Python":
field_block = [
generate_type_declaration_field(
name,
value,
sdk_language,
non_mandatory=(
name in non_mandatory_fields
and len(non_mandatory_fields) != len(fields)
),
)
for name, value in fields.items()
]
ordered_parents = (
[*parents] if ignore_base_types is True else ["TypedDict", *parents]
)
if len(non_mandatory_fields) == len(fields):
inheritance_block = f"({', '.join(ordered_parents)}, total=False)"
else:
inheritance_block = f"({', '.join(ordered_parents)})"
return f"class I{name}{inheritance_block}:\n" + "".join(field_block) + "\n\n"
def generate_type_declaration_field(
name, value, sdk_language, non_mandatory=False, allowed_special_characters=None, readonly=False
):
name = generate_identifier(
value=name,
sdk_language=sdk_language,
existing_identifiers=[],
allowed_special_characters=allowed_special_characters,
)
indent = " " * 4
if sdk_language == "TypeScript":
field_name_part = f"{indent}{"readonly " if readonly else ""}{name}?" if non_mandatory else f"{indent}{"readonly " if readonly else ""}{name}"
if isinstance(value, list):
return f"{field_name_part}: {value[0]}[],\n"
return f"{field_name_part}: {value},\n"
if sdk_language == "Python":
if isinstance(value, list):
hint = f"NotRequired[list[{value[0]}]]" if non_mandatory is True else f"list[{value[0]}]"
else:
hint = f"NotRequired[{value}]" if non_mandatory is True else f"{value}"
return f'{indent}{name}: {hint}\n'
def generate_data_class(
name,
fields,
sdk_language,
db_object_crud_ops: list[str],
obj_endpoint: Optional[str] = None,
primary_key_fields: set[str] = set(),
):
if sdk_language == "TypeScript":
if len(primary_key_fields) > 0:
if "UPDATE" in db_object_crud_ops:
fields.update({ "update()": f"Promise<I{name}>" })
if "DELETE" in db_object_crud_ops:
fields.update({ "delete()": f"Promise<void>" })
return generate_type_declaration(
name=name,
fields=fields,
sdk_language=sdk_language,
non_mandatory_fields=set(fields).difference({"update()", "delete()"}),
readonly_fields=primary_key_fields
)
if sdk_language == "Python":
field_type_block = [
(
generate_type_declaration_field(name, value, sdk_language).rstrip()
+ " | UndefinedDataClassField\n"
)
for name, value in fields.items()
]
field_profile = []
for field_name, type_hint in [
(
generate_identifier(value=field, sdk_language=sdk_language, existing_identifiers=[]),
value[0] if isinstance(value, list) else value,
)
for field, value in fields.items()
]:
field_profile.append(f'"{field_name}": {type_hint}')
join_field_profile = (
"{\n" + f"{" "*16}" + f",\n{" "*16}".join(field_profile).rstrip() + f"\n{" "*12}" + "}"
)
mixins = []
if len(primary_key_fields) > 0:
if "UPDATE" in db_object_crud_ops:
mixins.append(f'\n\t_MrsDocumentUpdateMixin["I{name}Data", "I{name}", "I{name}Details"],')
if "DELETE" in db_object_crud_ops:
mixins.append(f'\n\t_MrsDocumentDeleteMixin["I{name}Data", "I{name}Filterable"],')
if mixins:
mixins.append("\n\t")
return SDK_PYTHON_DATACLASS_TEMPLATE.format(
name=name,
join_field_block="".join(field_type_block).rstrip(),
obj_endpoint=obj_endpoint,
field_profile=join_field_profile,
primary_key_name=(
None if len(primary_key_fields) == 0 else f'"{",".join(primary_key_fields)}"'
),
mixins="".join(mixins),
)
def generate_field_enum(name, fields=None, sdk_language="TypeScript"):
if sdk_language == "TypeScript":
# In TypeScript the field enum can be obtained using keyof on the original object type declaration, so there is
# no need for an additional type declaration.
return ""
if sdk_language == "Python":
if not fields or len(fields) == 0:
# To avoid conditional logic in the template, we can generate a placeholder declaration regardless.
return generate_type_declaration_placeholder(f"{name}Field", sdk_language)
fields_in_case = [lib.core.convert_to_snake_case(field) for field in fields]
return generate_enum(f"{name}Field", fields_in_case, sdk_language)
def generate_enum(name, values, sdk_language):
enum_def = generate_literal_type(values, sdk_language)
if sdk_language == "TypeScript":
return f"export type I{name} = {enum_def};\n\n"
if sdk_language == "Python":
return f"I{name}: TypeAlias = {enum_def}\n\n\n"
def generate_type_declaration_placeholder(name, sdk_language, is_unpacked=False):
if sdk_language == "TypeScript":
return f"type I{name} = never;\n\n"
if sdk_language == "Python":
if not is_unpacked:
return f"I{name}: TypeAlias = None\n\n\n"
# Using an empty TypedDict helps to avoid issues with Unpack
return f"class I{name}(TypedDict):\n pass\n\n\n"
def generate_literal_type(values, sdk_language):
if sdk_language == "TypeScript":
items = [f'"{value}"' for value in values]
return f"{' | '.join(items)}"
if sdk_language == "Python":
items = [f'{" " * 4}"{value}"' for value in values]
s = ',\n'.join(items)
return (f"Literal[\n" +
f"{s},\n" +
"]")
def generate_selectable(name, fields, sdk_language):
if sdk_language == "TypeScript":
return ""
if sdk_language == "Python":
return generate_type_declaration(name=f"{name}Selectable", fields={ field: "bool" for field in fields },
sdk_language=sdk_language, non_mandatory_fields=set(fields))
def generate_sortable(
name: str,
fields: dict[str, str] = {},
sdk_language: Literal["TypeScript", "Python"] = "TypeScript",
):
if sdk_language == "TypeScript":
return ""
if sdk_language == "Python":
return generate_type_declaration(
name=f"{name}Sortable",
fields={field: "Order" for field in fields},
sdk_language=sdk_language,
non_mandatory_fields=set(fields),
requires_placeholder=True,
)
def generate_union(name, types, sdk_language):
if sdk_language == "TypeScript":
return f"export type {name} = {' | '.join(types)};\n\n"
if sdk_language == "Python":
return f"{name}: TypeAlias = {' | '.join(types)}\n\n\n"
def generate_sequence_constant(name, values, sdk_language):
if sdk_language == "TypeScript":
return f"const {name} = {json.dumps(values)} as const;\n"
elif sdk_language == "Python":
return f"{name}: Sequence = {json.dumps(values)}\n\n"
def object_is_routine(db_obj, of_type: set[str] = {"PROCEDURE", "FUNCTION", "SCRIPT"}):
return db_obj.get("object_type") in of_type
def generate_interfaces(
db_obj,
obj,
fields,
class_name,
sdk_language,
db_object_crud_ops: list[str],
obj_endpoint: Optional[str] = None,
):
obj_interfaces: list[str] = []
interface_fields = {}
param_interface_fields = {}
out_params_interface_fields = {}
obj_unique_fields = {}
obj_cursor_fields = {}
obj_sortable_fields: set[str] = set()
has_nested_fields = False
required_datatypes: set[str] = set()
# The I{class_name}, I{class_name}Params and I{class_name}Out interfaces
for field in fields:
db_column = field.get("db_column", {})
# The field needs to be on level 1 and enabled
if field.get("lev") == 1 and field.get("enabled"):
datatype = get_interface_datatype(field, sdk_language, class_name)
enhanced_fields = False if object_is_routine(db_obj) else True
enhanced_datatype = get_interface_datatype(
field=field,
sdk_language=sdk_language,
class_name=class_name,
enhanced_fields=enhanced_fields,
)
# Handle references
if field.get("represents_reference_id"):
has_nested_fields = True
# Check if the field should be reduced to the value of another field
reduced_to_datatype = get_reduced_field_interface_datatype(
field, fields, sdk_language, class_name
)
if reduced_to_datatype:
interface_fields.update({field.get("name"): reduced_to_datatype})
else:
obj_ref = field.get("object_reference")
# Add field if the referred table is not unnested
if not obj_ref.get("unnest"):
# If this field represents an OUT parameter of a SP, add it to the
# out_params_interface_fields list
if obj.get("kind") == "PARAMETERS" and object_is_routine(
db_obj
):
if db_column.get("in"):
param_interface_fields.update(
{field.get("name"): datatype}
)
if db_column.get("out"):
out_params_interface_fields.update(
{field.get("name"): datatype}
)
elif obj.get("kind") == "PARAMETERS" and not object_is_routine(
db_obj
):
param_interface_fields.update(
{field.get("name"): enhanced_datatype}
)
elif field.get("allow_filtering") and not object_is_routine(
db_obj
):
# RESULT
interface_fields.update({field.get("name"): datatype})
# Add all table fields that have allow_filtering set and SP params to the
# param_interface_fields
param_interface_fields.update(
{field.get("name"): enhanced_datatype}
)
# Call recursive interface generation
generate_nested_interfaces(
obj_interfaces,
interface_fields,
field,
reference_class_name_postfix=lib.core.convert_path_to_pascal_case(
field.get("name")
),
fields=fields,
class_name=class_name,
sdk_language=sdk_language,
)
elif obj.get("kind") == "PARAMETERS":
# If this field represents an OUT parameter of a SP, add it to the
# out_params_interface_fields list
if db_column.get("in"):
param_interface_fields.update({field.get("name"): datatype})
if db_column.get("out"):
out_params_interface_fields.update({field.get("name"): datatype})
else:
interface_fields.update({field.get("name"): datatype})
# Add all table fields that have allow_filtering set and SP params to the param_interface_fields
if field.get("allow_filtering") and not object_is_routine(db_obj):
param_interface_fields.update(
{field.get("name"): enhanced_datatype}
)
if not object_is_routine(db_obj):
enhanced_datatype = get_interface_datatype(
field=field,
sdk_language=sdk_language,
class_name=class_name,
enhanced_fields=True,
nullable=False,
)
# Build Unique list
if field_is_unique(field):
obj_unique_fields.update({field.get("name"): enhanced_datatype})
# Build list of columns which can potentially be used for cursor-based pagination.
if field_can_be_cursor(field):
obj_cursor_fields.update({field.get("name"): enhanced_datatype})
# Build list of sortable fields
if field_is_sortable(field):
obj_sortable_fields.add(field.get("name"))
if not object_is_routine(db_obj):
# The object is a TABLE or a VIEW
if sdk_language != "TypeScript":
# These type declarations are not needed for TypeScript because it uses a Proxy to replace the interface
# and not a wrapper class. This might change in the future.
mrs_resource_type = "IMrsResourceDetails"
required_datatypes.add(mrs_resource_type)
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}Details",
parents=[mrs_resource_type],
fields=interface_fields,
sdk_language=sdk_language,
ignore_base_types=True,
non_mandatory_fields=set(interface_fields),
)
)
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}Data",
fields=interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(interface_fields),
)
)
if "CREATE" in db_object_crud_ops:
obj_non_mandatory_fields = set(
[
field.get("name")
for field in fields
# exclude fields that are out of range (e.g. on different nesting levels)
if field.get("name") in interface_fields.keys()
and field_is_required(field, obj) is False
]
)
obj_interfaces.append(
generate_type_declaration(
name=f"New{class_name}",
fields=interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=obj_non_mandatory_fields,
)
)
if "UPDATE" in db_object_crud_ops:
# TODO: No partial update is supported yet. Once it is, the
# `non-mandatory_fields` argument should not change.
# This way, users can know what fields are required and which ones aren't.
# However, even when replacing an entire resource, it should be possible to
# unset nullable fields.
nullable_fields = [
field.get("name")
for field in fields
if field_is_nullable(field) or field_has_row_ownership(field, obj)
]
obj_interfaces.append(
generate_type_declaration(
name=f"Update{class_name}",
fields=interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(nullable_fields),
)
)
primary_key_fields = [field.get("name") for field in fields if field_is_pk(field)]
obj_interfaces.append(
generate_data_class(
name=class_name,
fields=interface_fields,
sdk_language=sdk_language,
db_object_crud_ops=db_object_crud_ops,
obj_endpoint=obj_endpoint,
primary_key_fields=set(primary_key_fields),
)
)
obj_interfaces.append(
generate_field_enum(
name=class_name, fields=interface_fields, sdk_language=sdk_language
)
)
if not has_nested_fields:
# This creates a type alias for something like None, which ensures there is always a default
# value for *{class_name}NestedField and saves us from using conditionals in the template.
obj_interfaces.append(
generate_field_enum(
name=f"{class_name}Nested", sdk_language=sdk_language
)
)
obj_interfaces.append(
generate_selectable(class_name, interface_fields, sdk_language)
)
obj_interfaces.append(
generate_sortable(class_name, obj_sortable_fields, sdk_language)
)
construct_parents = (
[]
if sdk_language != "Python"
else ["Generic[Filterable]", "HighOrderOperator[Filterable]"]
)
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}Filterable",
parents=construct_parents,
fields=param_interface_fields,
sdk_language=sdk_language,
ignore_base_types=True,
non_mandatory_fields=set(param_interface_fields),
)
)
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}UniqueFilterable",
fields=obj_unique_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(obj_unique_fields),
)
)
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}Cursors",
fields=obj_cursor_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(obj_cursor_fields),
# To avoid conditional logic in the template, we should generate a void type declaration.
requires_placeholder=True,
)
)
# FUNCTIONs, PROCEDUREs and SCRIPTs
elif obj.get("kind") == "RESULT":
obj_interfaces.append(
generate_type_declaration(
name=class_name,
fields=interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(interface_fields),
)
)
if len(interface_fields) > 0:
# Result sets are non-deterministic and there is no way to know if a column value can be NULL.
# Thus, we must assume that is always the case.
if sdk_language == "TypeScript":
required_datatypes.add("MaybeNull")
result_fields = {
"type": generate_literal_type([class_name], sdk_language),
"items": [f"I{class_name}"],
}
obj_interfaces.append(
generate_type_declaration(
name=f"Tagged{class_name}",
fields=result_fields,
sdk_language=sdk_language,
parents=(
["JsonObject"] if sdk_language == "TypeScript" else []
), # TypeScript tagged unions inherit from JsonObject
)
)
# The Python SDK uses a generic dataclass for the tagged union of result set types.
# The generic dataclass needs to know the possible values of the "type" field.
if sdk_language == "Python":
obj_interfaces.append(
generate_enum(
name=f"{class_name}Type",
values=[class_name],
sdk_language=sdk_language,
)
)
else: # kind = "PARAMETERS"
if len(param_interface_fields) > 0:
# Parameters are "optional" in a way that they can be NULL at the SQL level.
if sdk_language == "TypeScript":
required_datatypes.add("MaybeNull")
# Type definition for the set of IN/INOUT Parameters.
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}Params",
fields=param_interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(param_interface_fields),
# To avoid conditional logic in the template, we should generate a void type declaration.
requires_placeholder=True,
is_unpacked=True,
)
)
# Type definition for the set of OUT/INOUT Parameters.
obj_interfaces.append(
generate_type_declaration(
name=f"{class_name}ParamsOut",
fields=out_params_interface_fields,
sdk_language=sdk_language,
non_mandatory_fields=set(out_params_interface_fields),
# To avoid conditional logic in the template, we should generate a void type declaration.
# In this case, the placeholder is only needed for Procedures, because the type declaration
# is not used otherwise.
requires_placeholder=object_is_routine(db_obj, of_type={"PROCEDURE"}),
)
)
return "".join(obj_interfaces), required_datatypes
# For now, this function is not used for ${DatabaseObject}Params type declarations
def generate_nested_interfaces(
obj_interfaces, parent_interface_fields, parent_field,
reference_class_name_postfix,
fields, class_name, sdk_language):
# Build interface name
interface_name = f"{class_name}{reference_class_name_postfix}"
# Check if the reference has unnest set, and if so, use the parent_interface_fields
parent_obj_ref = parent_field.get("object_reference")
interface_fields = {} if not parent_obj_ref.get(
"unnest") else parent_interface_fields
for field in fields:
if (field.get("parent_reference_id") == parent_field.get("represents_reference_id") and
field.get("enabled")):
# Handle references
if field.get("represents_reference_id"):
# Check if the field should be reduced to the value of another field
reduced_to_datatype = get_reduced_field_interface_datatype(
field, fields, sdk_language, class_name)
if reduced_to_datatype:
interface_fields.update({ field.get("name"): reduced_to_datatype })
else:
obj_ref = field.get("object_reference")
field_interface_name = lib.core.convert_path_to_pascal_case(
field.get("name"))
# Add field if the referred table is not unnested
if not obj_ref.get("unnest"):
datatype = f"{class_name}{reference_class_name_postfix + field.get("name")}"
# Should use the corresponding nested field type.
interface_fields.update({ field.get("name"): interface_name + field_interface_name })
# If not, do recursive call
generate_nested_interfaces(
obj_interfaces, interface_fields, field,
reference_class_name_postfix=reference_class_name_postfix + field_interface_name,
fields=fields, class_name=class_name, sdk_language=sdk_language)
else:
datatype = get_interface_datatype(field, sdk_language)
interface_fields.update({ field.get("name"): datatype })
if not parent_obj_ref.get("unnest"):
obj_interfaces.append(generate_type_declaration(name=interface_name, fields=interface_fields,
sdk_language=sdk_language))
obj_interfaces.append(generate_field_enum(name=f"{class_name}Nested", fields=interface_fields,
sdk_language=sdk_language))
def get_mrs_runtime_management_code(session, service_url=None):
"""Returns TypeScript code that allows the user to manage specific MRS runtime settings.
This includes the configuration status of MRS with an info string giving context sensitive
information how and the list of available REST services with their URL and
and methods to set which REST services is the current one and to print the SDK runtime code.
Args:
session (object): The database session to use.
service_url (str): The URL of the current REST service
Returns:
The TypeScript code that defines and sets the mrs object
"""
status = lib.general.get_status(session)
if status['service_configured'] is True:
services = lib.services.get_services(session=session)
else:
services = []
s = """
class MrsService {
#serviceId: string;
public constructor(serviceId: string) {
this.#serviceId = serviceId;
}
public setAsCurrent = () => {
mrsSetCurrentService(this.#serviceId);
};
public edit = () => {
mrsEditService(this.#serviceId);
};
public exportSdk = () => {
mrsExportServiceSdk(this.#serviceId);
};
}
class Mrs {
"""
service_list = []
for service in services:
service_name = lib.core.convert_path_to_camel_case(
service.get("url_context_root"))
service_id = lib.core.convert_id_to_string(service.get("id"))
# If no explicit service_url is given, use the service's host_ctx and url_context_root
if not service_url or service.get("is_current") == 0:
host_ctx = service.get("host_ctx")
url_context_root = service.get("url_context_root")
# If no host_ctx starting with http is given, default to https://localhost:8443
if not host_ctx.lower().startswith("http"):
service_url = "https://localhost:8443" + url_context_root
else:
service_url = url_context_root + url_context_root
if service.get("is_current") == 1:
s += f' public {service_name} = {service_name};\n'
else:
s += f' public {service_name}: MrsService = new MrsService("{service_id}");\n'
service_list.append({
"serviceName": service_name, "url": service_url, "isCurrent": service.get("is_current") == 1})
if status['service_configured'] is False:
status_output = {
"configured": False,
"info": "The MySQL REST Service has not been configured on this MySQL instance yet. Switch to " +
"SQL mode and use the CONFIGURE REST METADATA command to configure the instance.",
"services": []}
elif len(service_list) == 0:
status_output = {
"configured": True,
"info": "No REST service has been created yet. Switch to SQL Mode and use the " +
"CREATE REST SERVICE command to create a new REST service.",
"services": []}
else:
status_output = {
"configured": True,
"info": f'{len(service_list)} REST service{"s" if len(service_list) > 1 else ""} available.',
"services": service_list}
s += f"""
public getStatus = () => {{
return {json.dumps(status_output)};
}}
public addService = () => {{
mrsEditService();
}}
public printSdkCode = () => {{
mrsPrintSdkCode();
}}
public refreshSdkCode = () => {{
mrsRefreshSdkCode();
}}
}}
const mrs = new Mrs();
"""
return s
def replace_group_1_match_only(m):
return "" if m.group(1) is not None else m.group(0)
def remove_js_whitespace_and_comments(code):
# Remove comments. This regex does not match single line comments that
# include a " character, which is ignored for simplicity.
code = re.sub(r"/\*.*?\*/|//[^\"]*?$", "",
code, flags=re.DOTALL | re.MULTILINE)
# Remove all empty linebreaks
code = re.sub(r"^\s*\n", "", code, flags=re.DOTALL | re.MULTILINE)
# Substitute leading spaces with tabs
code = re.sub(r" ", "\t", code, flags=re.DOTALL | re.MULTILINE)
return code