mrs_plugin/lib/content_sets.py (1,339 lines of code) (raw):
# Copyright (c) 2021, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from mrs_plugin.lib import core, content_files, services, schemas, db_objects
from mrs_plugin.lib.MrsDdlExecutor import MrsDdlExecutor
import os
import re
import json
import pathlib
import datetime
from urllib.request import urlopen
import tempfile
import zipfile
import ssl
import shutil
OPENAPI_UI_URL = "http://github.com/swagger-api/swagger-ui/archive/refs/tags/v5.17.14.zip"
OPENAPI_DARK_CSS_URL = "https://github.com/Amoenus/SwaggerDark/releases/download/v1.0.0/SwaggerDark.css"
# The regex below require that all comments and strings have been blanked before
# Regex to match 9 levels of matching curly brackets { }
MATCHING_CURLY_BRACKETS_REGEX = \
r"({(?:(?:{(?:(?:{(?:(?:{(?:(?:{(?:(?:{(?:(?:{(?:(?:{(?:(?:{[^}{]*})|[^}{])*})|[^}{])*})|[^}{])*})|[^}{])*})|[^}{])*})|[^}{])*})|[^}{])*})|[^}{])*})"
# Regex to match 9 levels of matching square brackets { }
MATCHING_SQUARE_BRACKETS_REGEX = \
r"(\[(?:(?:\[(?:(?:\[(?:(?:\[(?:(?:\[(?:(?:\[(?:(?:\[(?:(?:\[(?:(?:\[[^\]\[]*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])|[^\]\[])*\])"
# Regex to match MRS schema decorator and class content
TS_SCHEMA_DECORATOR_REGEX = \
r"@Mrs\.(schema|module)\s*\(\s*{(.*?)}\)\s*class\s*([\w$]+)\s*" + \
MATCHING_CURLY_BRACKETS_REGEX
# Regex to match decorator parameters represented as values in a dict with stripped curly brackets { }
# e.g. name: "mrs_notes_scripts", enabled: true, triggerType: MrsScriptFunctionType.BeforeUpdate,
TS_DECORATOR_PROPS_REGEX = \
r"\s*(\w*)\s*:\s*((\"[^\"\\]*(?:\\.[^\"\\]*)*\")|(\'[^\'\\]*(?:\\.[^\'\\]*)*\')|(\`[^\`\\]*(?:\\.[^\`\\]*)*\`)|" + \
MATCHING_CURLY_BRACKETS_REGEX + "|" + \
MATCHING_SQUARE_BRACKETS_REGEX + "|" + \
r"([\w\.]*))"
# Regex to match MRS script/trigger decorator and function content
TS_SCRIPT_DECORATOR_REGEX = \
r"@Mrs\.(script|trigger)\s*\(\s*{(.*?)}\s*\)\s*public\s*static\s*(async)?\s*([\w\d_]+)\s*\((.*?)\)\s*:\s*(.*?)\s*" + \
MATCHING_CURLY_BRACKETS_REGEX
# Regex to match each parameter + type
TS_SCRIPT_PARAMETERS_REGEX = \
r"\s*(.*?)(\?)?\s*(:\s*(.*?)\s*)?(=\s*(.*?))?,"
# Regex to remove Promise<> from the result type
TS_SCRIPT_RESULT_REMOVE_PROMISE_REGEX = \
r"Promise\s*<(.*?)>\s*$"
# Regex to match TS interface definitions
TS_INTERFACE_REGEX = \
r"export\s+interface\s+(.+?)\s+(extends\s+(.+?))?\s*" + \
MATCHING_CURLY_BRACKETS_REGEX
# Regex to match TS interface fields
TS_INTERFACE_FIELDS_REGEX = \
r"\s*(readonly\s+)?(\[\s*(.+?)\s*:\s*(.+?)\s*\]|.+?)(\?)?\s*:\s*(.+?)\s*[,;]"
def format_content_set_listing(content_sets, print_header=False):
"""Formats the listing of content_sets
Args:
content_sets (list): A list of content_sets as dicts
print_header (bool): If set to true, a header is printed
Returns:
The formatted list of services
"""
if not content_sets:
return "No items available."
if print_header:
output = (f"{'ID':>3} {'PATH':96} {'ENABLED':8} "
f"{'AUTH':9}\n")
else:
output = ""
for i, item in enumerate(content_sets, start=1):
url = (item['host_ctx'] + item['request_path'])
output += (f"{i:>3} {url[:95]:96} "
f"{'Yes' if item['enabled'] else '-':8} "
f"{'Yes' if item['requires_auth'] else '-':5}")
if i < len(content_sets):
output += "\n"
return output
def delete_content_set(session, content_set_ids: list):
if not content_set_ids:
raise ValueError("The specified content_set was not found.")
for content_set_id in content_set_ids:
content_set = get_content_set(
session=session, content_set_id=content_set_id)
# core.delete(table="content_file", where="content_set_id=?").exec(
# session, [content_set_id])
if not core.delete(table="content_set", where="id=?").exec(session, [content_set_id]).success:
raise Exception(
f"The specified content_set with id {content_set_id} was "
"not found.")
# Perform cleanup of corresponding db_schema that is a SCRIPT_MODULE, if it has no scripts left
if content_set.get("content_type") == "SCRIPTS":
rows = core.MrsDbExec("""
SELECT id FROM `mysql_rest_service_metadata`.`db_schema` AS s
WHERE schema_type = 'SCRIPT_MODULE' AND (
SELECT COUNT(*) FROM `mysql_rest_service_metadata`.`db_object` AS o
WHERE o.db_schema_id = s.id) = 0
""").exec(session).items
for row in rows:
core.delete(table="db_schema", where="id=?").exec(
session, [row["id"]])
def enable_content_set(session, content_set_ids: list, value: bool):
"""Makes a given change to a MRS content set
Args:
value (bool): Update value for the 'enabled' status
content_set_ids (list): The list of content set ids to update
session (object): The database session to use
Returns:
The result message as string
"""
if not content_set_ids:
raise ValueError("The specified content_set was not found.")
# Update all given services
for content_set_id in content_set_ids:
result = core.update(table="content_set",
sets={"enabled": value},
where="id=?"
).exec(session, [content_set_id])
def query_content_sets(session, content_set_id: bytes = None, service_id: bytes = None,
request_path=None, include_enable_state=None):
"""Gets a specific MRS content_set
Args:
session (object): The database session to use.
service_id: The id of the service
request_path (str): The request_path of the content_set
content_set_id: The id of the content_set
include_enable_state (bool): Only include items with the given
enabled state
Returns:
The schema as dict or None on error in interactive mode
"""
if request_path and not request_path.startswith('/'):
raise Exception("The request_path has to start with '/'.")
# Build SQL based on which input has been provided
sql = """
SELECT cs.id, cs.service_id, cs.request_path, cs.requires_auth,
cs.enabled, cs.comments, cs.options,
CONCAT(h.name, se.url_context_root) AS host_ctx,
cs.content_type
FROM `mysql_rest_service_metadata`.`content_set` cs
LEFT OUTER JOIN `mysql_rest_service_metadata`.`service` se
ON se.id = cs.service_id
LEFT JOIN `mysql_rest_service_metadata`.`url_host` h
ON se.url_host_id = h.id
"""
params = []
wheres = []
if service_id:
wheres.append("cs.service_id = ?")
params.append(service_id)
if request_path:
wheres.append("cs.request_path = ?")
params.append(request_path)
if content_set_id:
wheres.append("cs.id = ?")
params.append(content_set_id)
if include_enable_state is not None:
wheres.append("cs.enabled = ?")
params.append("TRUE" if content_set_id else "FALSE")
sql += core._generate_where(wheres)
return core.MrsDbExec(sql, params).exec(session).items
def get_content_set(session, service_id: bytes | None = None, request_path=None, content_set_id: bytes | None = None) -> dict | None:
"""Gets a specific MRS content_set
Args:
session (object): The database session to use.
service_id: The id of the service
request_path (str): The request_path of the content_set
content_set_id: The id of the content_set
Returns:
The schema as dict or None on error in interactive mode
"""
if request_path and not request_path.startswith('/'):
raise Exception("The request_path has to start with '/'.")
# Build SQL based on which input has been provided
result = query_content_sets(session=session, content_set_id=content_set_id,
service_id=service_id, request_path=request_path)
return result[0] if result else None
def get_content_sets(session, service_id: bytes, include_enable_state=None, request_path=None):
"""Returns all content sets for the given MRS service
Args:
session (object): The database session to use.
service_id: The id of the service to list the schemas from
include_enable_state (bool): Only include items with the given
enabled state
Returns:
Either a string listing the content sets when interactive is set or list
of dicts representing the content sets
"""
return query_content_sets(session=session, service_id=service_id,
request_path=request_path, include_enable_state=include_enable_state)
def get_content_set_count(session, service_id: bytes):
sql = "SELECT COUNT(*) FROM `mysql_rest_service_metadata`.`content_set` cs WHERE cs.service_id = ?"
res = core.MrsDbExec(sql, [service_id]).exec(session).first
return res[0]
def add_content_set(session, service_id, request_path, requires_auth=False, comments="", options=None, enabled=1,
content_dir=None, send_gui_message=None, service=None, ignore_list=None):
# Example usage:
# mysqlsh dba@localhost --sql -e "CREATE OR REPLACE REST CONTENT SET /mrsScriptsContent ON SERVICE /myService FROM '~/git/shell-plugins/mrs_plugin/examples/mrs_scripts/' LOAD SCRIPTS"
if service is None:
service = services.get_service(
session=session, service_id=service_id)
core.Validations.request_path(request_path, session=session)
if ignore_list is None:
ignore_list = "*node_modules/*, */.*"
open_api_ui = False
# Expand ${openApiUi} variable path to actual OpenAPI UI
if content_dir == r"${openApiUi}":
open_api_ui = True
content_dir = prepare_open_api_ui(
service=service, request_path=request_path, send_gui_message=send_gui_message)
if content_dir is not None and os.path.isdir(os.path.expanduser(content_dir)) is False:
raise ValueError(
f"The given path {content_dir} does not exist.")
contains_mrs_scripts = False
if options is not None:
contains_mrs_scripts = options.get("contains_mrs_scripts", False)
mrs_script_language = options.get("mrs_scripting_language", None)
if contains_mrs_scripts is True and mrs_script_language is None:
mrs_script_language = get_folder_mrs_scripts_language(
path=content_dir, ignore_list=ignore_list)
if mrs_script_language is None:
raise ValueError(
"The MRS scripting language has not been specified and cannot be detected.")
options["mrs_scripts_folder_name"] = os.path.basename(
os.path.normpath(content_dir))
# Check if the open_api_ui should be downloaded and if so, download it, extract and patch it and
# update the content_dir to point to the temporary directory holding it
if content_dir is not None:
# Convert Unix path to Windows
content_dir = os.path.abspath(
os.path.expanduser(content_dir))
# If a content_dir has been provided, check if that directory exists
if not os.path.isdir(content_dir):
raise ValueError(
f"The given content directory path '{content_dir}' "
"does not exist.")
content_set_id = core.get_sequence_id(session)
values = {
"id": content_set_id,
"service_id": service_id,
"request_path": request_path,
"requires_auth": int(requires_auth),
"enabled": int(enabled),
"comments": comments,
"options": options,
"content_type": "STATIC" if not contains_mrs_scripts else "SCRIPTS",
}
# Create the content_set, ensure it is created as "not enabled"
core.insert(table="content_set", values=values).exec(session)
file_list = None
if content_dir is not None:
file_list = content_files.add_content_dir(
session, content_set_id,
content_dir, requires_auth,
ignore_list,
send_gui_message=send_gui_message
)
if contains_mrs_scripts:
# Update db_schemas/db_objects based on script definition
script_def = update_scripts_from_content_set(
session=session, content_set_id=content_set_id,
language=mrs_script_language,
content_dir=content_dir,
ignore_list=ignore_list,
send_gui_message=send_gui_message)
if len(script_def["errors"]) > 0:
raise ValueError(
"The following errors occurred when parsing the MRS Scripts:\n"
+ json.dumps(script_def["errors"]))
# Enable the content set if requested by the user
enable_content_set(
session=session, content_set_ids=[content_set_id], value=enabled)
# In case the open_api_ui was downloaded, make sure to delete the temporary folder it used
if open_api_ui == True:
shutil.rmtree(content_dir)
return values["id"], len(file_list) if file_list is not None else 0
def update_content_set(session, content_set_id, value, file_ignore_list=None,
send_gui_message=None, merge_options=False):
if value is None:
raise ValueError(
"Failed to update REST content set. No values specified.")
options = value.get("options", {})
contains_mrs_scripts = options.get("contains_mrs_scripts", False)
mrs_script_language = options.get("mrs_scripting_language", None)
if contains_mrs_scripts is True and mrs_script_language is None:
raise ValueError(
"Failed to update REST content set. The options are missing the `mrs_scripting_language` setting.")
if contains_mrs_scripts is True:
value["content_type"] = "SCRIPTS"
# Prepare the merge of options, if requested
if merge_options:
options = value.get("options", None)
# Check if there are options set already, if so, merge the options
if options is not None:
row = core.MrsDbExec("""
SELECT options IS NULL AS options_is_null
FROM `mysql_rest_service_metadata`.`content_set`
WHERE id = ?""", [content_set_id]).exec(session).first
if row and row["options_is_null"] == 1:
merge_options = False
else:
value.pop("options")
core.update(
table="content_set",
sets=value,
where=["id=?"]
).exec(session, [content_set_id])
# Merge options if requested
if merge_options and options is not None:
core.MrsDbExec("""
UPDATE `mysql_rest_service_metadata`.`content_set`
SET options = JSON_MERGE_PATCH(options, ?)
WHERE id = ?
""", [options, content_set_id]).exec(session)
if contains_mrs_scripts:
# Update db_schemas/db_objects based on script definition
script_def = update_scripts_from_content_set(
session=session, content_set_id=content_set_id,
language=mrs_script_language,
ignore_list=file_ignore_list,
send_gui_message=send_gui_message)
if len(script_def["errors"]) > 0:
raise ValueError(
"The following errors occurred when parsing the MRS Scripts:\n"
+ json.dumps(script_def["errors"]))
def get_current_content_set(session):
"""Returns the current content_set
This only applies to interactive sessions in the shell where the
id of the current content_set is stored in the global config
Args:
session (object): The database session to use.
Returns:
The current content_set or None if no current content_set was set
"""
# Get current_service_id from the global mrs_config
mrs_config = core.get_current_config()
current_content_set_id = mrs_config.get('current_content_set_id')
current_content_set = None
if current_content_set_id:
current_content_set = get_content_set(
content_set_id=current_content_set_id,
session=session)
return current_content_set
def blank_js_comments(s, blank_char=" "):
pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)"
regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
cleaner_regex = re.compile(r"[^\n]*", re.MULTILINE | re.DOTALL)
# def _replacer(match):
# if match.group(2) is not None:
# # Replace multi-line comments with an empty comment
# if match.group(2).count("\n") > 0:
# return "/*" + "\n" * match.group(2).count("\n") + "*/"
# return ""
# else:
# return match.group(1)
def _replacer(match: re.Match):
if match.group(2) is not None:
# Blank single line comments
if match.group(2).startswith("//"):
return "//" + " " * (match.end(2) - match.start(2) - 2)
# For multi-line strings, ensure to keep line breaks in place. Replace all other characters.
def __replacer(match2: re.Match):
blanked_string = blank_char * (match2.end() - match2.start())
return blanked_string
return "/*" + cleaner_regex.sub(__replacer, match.group())[2:-2] + "*/"
else:
return match.group(1)
return regex.sub(_replacer, s)
def blank_quoted_js_strings(s, blank_char=" "):
# Blank strings with all allowed string quotes in JS
s = blank_quoted_strings(s, '"', r'\"', blank_char)
s = blank_quoted_strings(s, "'", r"\'", blank_char)
s = blank_quoted_strings(s, "`", r"\`", blank_char)
return s
def blank_quoted_strings(s, quote_char, quote_r_char, blank_char=" "):
# Build the pattern like this, showcasing it for \": r"\"[^\"\\]*(?:\\.[^\"\\]*)*\""
pattern = quote_r_char + \
r"[^" + quote_r_char + \
r"\\]*(?:\\.[^" + quote_r_char + r"\\]*)*" + quote_r_char
regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
cleaner_regex = re.compile(r"[^\n]*", re.MULTILINE | re.DOTALL)
def _replacer(match: re.Match):
def __replacer(match2: re.Match):
blanked_string = blank_char * (match2.end() - match2.start())
return blanked_string
return quote_char + cleaner_regex.sub(__replacer, match.group())[1:-1] + quote_char
# def _replacer(match: re.Match):
# print(f"{match.group()=}")
# if "\n" in match.group():
# linebreaks = match.group().count("\n")
# replace_with = quote_char + " " * (match.end() - match.start() - 2 - linebreaks) + "\n" * linebreaks + quote_char
# else:
# replace_with = quote_char + " " * (match.end() - match.start() - 2) + quote_char
# print(f"{replace_with=}")
# return replace_with
return regex.sub(_replacer, s)
def convert_ignore_list_to_regex_pattern(ignore_list):
ignore_patterns = []
for pattern in ignore_list.split(","):
ignore_patterns.append(pattern.strip().replace("\\", "/").replace(".", "\\.").replace(
"*", ".*").replace("?", "."))
if len(pattern) > 0:
return re.compile(
"(" + ")|(".join(ignore_patterns) + ")", flags=re.MULTILINE | re.DOTALL)
return None
def get_folder_mrs_scripts_language(path, ignore_list):
full_ignore_pattern = convert_ignore_list_to_regex_pattern(ignore_list)
path = os.path.expanduser(path)
for root, dirs, files in os.walk(path):
for file in files:
fullname = os.path.join(root, file)
# If the filename matches the ignore list, ignore the file
if full_ignore_pattern is not None and re.match(full_ignore_pattern, fullname.replace("\\", "/")):
continue
# Detect TypeScript
if ((fullname.endswith(".mts") or fullname.endswith(".ts"))
and not (fullname.endswith(".spec.mts")
or fullname.endswith(".spec.ts")
or fullname.endswith(".d.ts"))):
# Read the file content
with open(fullname, 'r') as f:
code = f.read()
# Clear TypeScript comments and strings for regex matching
code_cleared = blank_quoted_js_strings(
blank_js_comments(code))
# Search for SCHEMA_DECORATOR
match = re.search(
TS_SCHEMA_DECORATOR_REGEX, code_cleared, re.MULTILINE | re.DOTALL)
if match is not None:
return "TypeScript"
return None
def get_decorator_param_value(param_value):
if param_value.startswith('"') or param_value.startswith("'") or param_value.startswith("`"):
param_value = param_value[1:-1]
elif param_value.startswith("{") or param_value.startswith("["):
# Make sure to match on a version of param_value with blanked out strings
param_value_blanked = blank_quoted_js_strings(param_value)
# Remove comma on last item if present
bracket_char = r"}" if param_value.startswith("{") else r"]"
# Instead of a simple re.sub() perform the replacement on the original param_value, not the one with
# blanked strings. Also, do the replacement from bottom up, since the chars are removed and the matched
# positions would get out of sync with the original param_value
matches = re.finditer(r",\s*" + bracket_char, param_value_blanked,
re.MULTILINE | re.DOTALL)
for match in reversed(list(matches)):
param_value = param_value[0:match.start(
)] + bracket_char + param_value[match.end():]
# since the original param_value was changed, get a new blanked version
param_value_blanked = blank_quoted_js_strings(param_value)
# Add quotes to keys
matches = re.finditer(
r"(?<={|,)\s*([a-zA-Z][a-zA-Z0-9]*)(?=:)", param_value_blanked, re.MULTILINE | re.DOTALL)
for match in reversed(list(matches)):
param_value = param_value[0:match.start(1)] + \
'"' + \
param_value[match.start(1):match.end(
1)] + '"' + param_value[match.end(1):]
try:
param_value = json.loads(param_value)
except:
pass
else:
param_value = param_value.strip()
if param_value.lower() == "true":
param_value = True
elif param_value.lower() == "false":
param_value = False
else:
try:
param_value = float(param_value)
except:
pass
return param_value
def get_decorator_properties(code, code_cleared, start, end):
props = []
matches = re.finditer(
TS_DECORATOR_PROPS_REGEX, code_cleared[start:end], re.MULTILINE | re.DOTALL)
# for match_id, match in enumerate(matches, start=1):
# print("Match {matchNum} was found at {start}-{end}: {match}".format(
# matchNum=match_id, start=match.start(), end=match.end(), match=match.group()))
# for groupNum in range(0, len(match.groups())):
# groupNum = groupNum + 1
# print("Group {groupNum} found at {start}-{end}: {group}".format(groupNum=groupNum, start=match.start(
# groupNum), end=match.end(groupNum), group=match.group(groupNum)))
for match in matches:
prop_value = code[
start + match.start(2):start + match.end(2)]
props.append({
"name": match.group(1),
"value": get_decorator_param_value(prop_value),
})
return props
def get_function_params(code, code_cleared, start, end):
params = []
# Add a trailing , to allow for an easier regex
params_string = code_cleared[start:end] + ","
matches = re.finditer(
TS_SCRIPT_PARAMETERS_REGEX, params_string, re.MULTILINE | re.DOTALL)
# for match_id, match in enumerate(matches, start=1):
# print("Match {match_id} was found at {start}-{end}: {match}".format(
# match_id=match_id, start=match.start(), end=match.end(), match=match.group()))
# for group_id in range(0, len(match.groups())):
# group_id = group_id + 1
# print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id,
# start=match.start(group_id), end=match.end(group_id), group=match.group(group_id)))
for match in matches:
# Ignore additional match caused by adding the "," above
if match.group(1) == "":
continue
# If a default value is given for the parameter, also get its type
optional = match.group(2) is not None
if match.group(6) is not None:
optional = True
default_value = code[start + match.start(6):start + match.end(6)]
if core.is_number(default_value):
default_value = float(default_value)
default_type = "number"
elif default_value == "true" or default_value == "false":
default_value = default_value == "true"
default_type = "boolean"
else:
if default_value[0] == "'" or default_value[0] == '"':
default_value = default_value[1:-1]
default_type = "string"
else:
default_value = None
default_type = "unknown"
param_type = match.group(4) if match.group(
4) is not None else default_type
param_type_array = False
if param_type.endswith("[]"):
param_type = param_type[:-2]
param_type_array = True
param = {
"name": match.group(1),
"type": param_type,
"optional": optional,
"is_array": param_type_array,
}
if default_value is not None:
param["default"] = default_value
params.append(param)
return params
def get_function_return_type_no_promise(code):
result = re.findall(
TS_SCRIPT_RESULT_REMOVE_PROMISE_REGEX, code, re.MULTILINE | re.DOTALL)
return result[0] if len(result) > 0 else "void"
def get_typescript_interface_props(code, code_cleared, start, end):
props = []
# Add a trailing ; to allow for an easier regex
props_string = code_cleared[start+1:end-1] + ";"
matches = re.finditer(
TS_INTERFACE_FIELDS_REGEX, props_string, re.MULTILINE | re.DOTALL)
for match in matches:
prop = {
"name": match.group(2) if match.group(3) is None else match.group(3),
"type": match.group(6),
"optional": match.group(5) is not None,
"readOnly": match.group(1) is not None,
}
if match.group(4) is not None:
prop["indexSignatureType"] = match.group(4)
props.append(prop)
return props
def get_mrs_typescript_interface_definitions(file, interfaces_def):
# Find the TS interface definitions
matches = re.finditer(
TS_INTERFACE_REGEX, file["code_cleared"], re.MULTILINE | re.DOTALL)
for match in matches:
# Get the starting line number of the class definition
ln_number_start = file["code"].count("\n", 0, match.start()) + 1
ln_number_end = file["code"].count("\n", 0, match.end()) + 1
ts_interface = {
"file_info": {
"full_file_name": file["full_file_name"],
"relative_file_name": file["relative_file_name"],
"file_name": file["file_name"],
"last_modification": file["last_modification"],
},
"name": match.group(1),
"code_position": {
"line_number_start": ln_number_start,
"line_number_end": ln_number_end,
"character_start": match.start(),
"character_end": match.end(),
},
"properties": get_typescript_interface_props(file["code"], file["code_cleared"], match.start(4), match.end(4)),
}
if match.group(3) is not None:
ts_interface["extends"] = match.group(3)
interfaces_def.append(ts_interface)
def get_mrs_typescript_definitions(file, mrs_script_def):
# Find the MRS Schema definitions, which map to TS classes
matches = re.finditer(
TS_SCHEMA_DECORATOR_REGEX, file["code_cleared"], re.MULTILINE | re.DOTALL)
# for match_id, match in enumerate(matches, start=1):
# print("Match {match_id} was found at {start}-{end}: {match}".format(
# match_id=match_id, start=match.start(), end=match.end(), match=match.group()))
# for group_id in range(0, len(match.groups())):
# group_id = group_id + 1
# print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id,
# start=match.start(group_id), end=match.end(group_id), group=match.group(group_id)))
for match in matches:
# Group 1 holds the decorator properties
props = get_decorator_properties(
file["code"], file["code_cleared"], match.start(2), match.end(2))
# Get the starting line number of the class definition
ln_number_start = file["code"].count("\n", 0, match.start()) + 1
ln_number_end = file["code"].count("\n", 0, match.end()) + 1
schema_def = {
"file_info": {
"full_file_name": file["full_file_name"],
"relative_file_name": file["relative_file_name"],
"file_name": file["file_name"],
"last_modification": file["last_modification"],
},
"class_name": file["code"][match.start(3):match.end(3)],
"schema_type": "SCRIPT_MODULE" if match.group(1) == "module" else "DATABASE_SCHEMA",
"code_position": {
"line_number_start": ln_number_start,
"line_number_end": ln_number_end,
"character_start": match.start(),
"character_end": match.end(),
},
"properties": props,
"scripts": [],
"triggers": [],
}
mrs_script_def.append(schema_def)
class_content = match.group(4)
class_content_offset = match.start(4)
script_matches = re.finditer(
TS_SCRIPT_DECORATOR_REGEX, class_content, re.MULTILINE | re.DOTALL)
# for script_match_id, script_match in enumerate(script_matches, start=1):
# print("Match {script_match_id} was found at {start}-{end}: {match}".format(
# script_match_id=script_match_id, start=script_match.start(), end=script_match.end(), match=script_match.group()))
# for group_id in range(0, len(script_match.groups())):
# group_id = group_id + 1
# print("Group {group_id} found at {start}-{end}: {group}".format(group_id=group_id, start=script_match.start(
# group_id), end=script_match.end(group_id), group=script_match.group(group_id)))
for script_match in script_matches:
ln_number_start = file["code"].count(
"\n", 0, class_content_offset+script_match.start()) + 1
ln_number_end = file["code"].count(
"\n", 0, class_content_offset+script_match.end()) + 1
# Group 2 holds the decorator params
props = get_decorator_properties(
file["code"], file["code_cleared"],
class_content_offset+script_match.start(2), class_content_offset+script_match.end(2))
params = get_function_params(
file["code"], file["code_cleared"],
class_content_offset+script_match.start(5), class_content_offset+script_match.end(5))
return_type = get_function_return_type_no_promise(
file["code"][class_content_offset+script_match.start(6):class_content_offset+script_match.end(6)])
returns_array = False
if return_type.endswith("[]"):
return_type = return_type[:-2]
returns_array = True
script_def = {
"function_name": file["code"][
class_content_offset+script_match.start(4):class_content_offset+script_match.end(4)],
"code_position": {
"line_number_start": ln_number_start,
"line_number_end": ln_number_end,
"character_start": class_content_offset+script_match.start(),
"character_end": class_content_offset+script_match.end(),
},
"parameters": params,
"return_type": {
"type": return_type,
"is_array": returns_array,
},
"properties": props
}
if script_match.group(1) == "trigger":
schema_def["triggers"].append(script_def)
else:
schema_def["scripts"].append(script_def)
def is_simple_typescript_type(typeName):
return typeName == "boolean" or typeName == "number" or typeName == "string"
def interface_derives_from(interface_name, master_interface_name, interfaces):
if interface_name == master_interface_name:
return True
# Lookup the interface by name. If the interface cannot be found in the interface
# definition list, return false
interface = next(
(item for item in interfaces if item["name"] == interface_name), None)
if interface is None:
return False
# Check if the interface extends another interface, if not return False.
# If it extends the desired interface, return True
interface_extends = interface.get("extends")
if interface_extends is None:
return False
if interface_extends == master_interface_name:
return True
# Check if the parent interface derives from the requested interface recursively and return that value
return interface_derives_from(interface_extends, master_interface_name, interfaces)
def get_typescript_interface_from_list(type_name, interface_list):
for interface_def in interface_list:
if interface_def["name"] == type_name:
return interface_def
return None
def add_typescript_interface_to_list(type_name, interface_list, interfaces_def):
# If the name of the type matches a simple type, do not add it
if is_simple_typescript_type(type_name):
return True
# If the name is already in the list, do not add it again
for interface_def in interface_list:
if interface_def["name"] == type_name:
return True
# Look the type_name up in the list of know interface definitions
for interface_def in interfaces_def:
if interface_def["name"] == type_name:
interface_list.append(interface_def)
# If the interface extends another interface, make sure to add that interface as well, recursively
extends = interface_def.get("extends")
if extends is not None: # and extends != "IMrsInterface":
add_typescript_interface_to_list(
extends, interface_list, interfaces_def)
return True
return False
def match_typescript_script_types_to_interface_list(interfaces_def, mrs_script_def, errors):
used_interfaces = []
for script_module in mrs_script_def:
for script in script_module["scripts"]:
# Handle return type
return_type = script["return_type"]["type"]
if not add_typescript_interface_to_list(return_type, used_interfaces, interfaces_def):
errors.append({
"kind": "TypeError",
"message": f"The script {script["function_name"]} returns an unknown datatype `{return_type}`.",
"script": script,
"file_info": script_module["file_info"],
})
# Handle parameters
for parameter in script["parameters"]:
param_type = parameter["type"]
if param_type.endswith("[]"):
errors.append({
"kind": "TypeError",
"message": "A script parameter must not be an array. " +
f"The script {script["function_name"]} used `{parameter["type"]}` " +
f"as parameter type for `{parameter["name"]}`.",
"script": script,
"file_info": script_module["file_info"],
})
elif not add_typescript_interface_to_list(param_type, used_interfaces, interfaces_def):
errors.append({
"kind": "TypeError",
"message":
f'Unknown datatype `{param_type}` used for script parameter `{
parameter["name"]}`.',
"script": script,
"file_info": script_module["file_info"],
})
# Use a while loop here, since the used_interfaces list can grow while the looping over the list
i = 0
while i < len(used_interfaces):
interface = used_interfaces[i]
for property in interface["properties"]:
property_type = property["type"]
# If the property uses an array, only match the type of the array
if property_type.endswith("[]"):
property_type = property_type[:-2]
if not add_typescript_interface_to_list(property_type, used_interfaces, interfaces_def):
errors.append({
"kind": "TypeError",
"message":
f'Unknown datatype `{property_type}` used for interface property `{
property["name"]}`.',
"interface": interface,
"file_info": interface["file_info"],
})
i += 1
# Check that the return_type is a simple type or derives from IMrsInterface
# for script_module in mrs_script_def:
# for script in script_module["scripts"]:
# # Handle return type
# return_type = script["return_type"]["type"]
# if not (is_simple_typescript_type(return_type) or interface_derives_from(
# return_type, "IMrsInterface", used_interfaces)):
# errors.append({
# "kind": "TypeError",
# "message": f"The datatype `{return_type}` returned by the MRS script does not derive from "
# + "IMrsInterface nor is a simple type.",
# "script": script,
# "file_info": script_module["file_info"],
# })
return used_interfaces
def get_file_mrs_script_definitions(path, language):
path = os.path.expanduser(path)
# Read the file content
with open(path, 'r') as f:
code = f.read()
# Clear TypeScript comments and strings for regex matching
code_cleared = blank_quoted_js_strings(
blank_js_comments(code))
codeFile = {
"full_file_name": path,
"relative_file_name": path,
"file_name": os.path.basename(path),
"last_modification": datetime.datetime.fromtimestamp(
pathlib.Path(path).stat().st_mtime, tz=datetime.timezone.utc).strftime("%F %T.%f")[:-3],
"code": code,
"code_cleared": code_cleared,
}
# Progress TypeScript
mrs_script_def = []
if language == "TypeScript":
get_mrs_typescript_definitions(
file=codeFile, mrs_script_def=mrs_script_def)
return mrs_script_def
def get_mrs_script_definitions_from_code_file_list(code_files, language, send_gui_message=None):
# Get both, interface and script definitions
mrs_script_modules_def = []
interfaces_def = []
errors = []
if language == "TypeScript":
for stage in ["interfaces", "scripts"]:
for file in code_files:
if stage == "interfaces":
if send_gui_message is not None:
send_gui_message(
"info", f"Parsing MRS Scripts file {file["relative_file_name"]} ...")
get_mrs_typescript_interface_definitions(
file, interfaces_def)
elif stage == "scripts":
get_mrs_typescript_definitions(
file, mrs_script_modules_def)
# Limit the interface list to interfaces used in scripts and check for missing interface definitions
used_interfaces = match_typescript_script_types_to_interface_list(
interfaces_def, mrs_script_modules_def, errors)
mrs_script_def = {
"script_modules": mrs_script_modules_def,
"interfaces": used_interfaces,
"errors": errors,
"language": language,
}
return mrs_script_def
def is_common_build_folder(dir):
if (dir.lower() == "build" or dir.lower() == "output" or dir.lower() == "out" or dir.lower() == "dist"):
return True
return False
def is_common_static_content_folder(dir):
if (dir.lower() == "static" or dir.lower() == "assets" or dir.lower() == "media" or dir.lower() == "web"
or dir.lower() == "js" or dir.lower() == "css" or dir.lower() == "images"):
return True
return False
def get_code_files_from_folder(path, ignore_list, language):
full_ignore_pattern = convert_ignore_list_to_regex_pattern(ignore_list)
path = os.path.expanduser(path)
code_files = []
build_folder = None
static_content_folders = []
for root, dirs, files in os.walk(path):
# Check if there is a build directory with a common name in the root dir
if path == root:
for dir in dirs:
if is_common_build_folder(dir):
build_folder = dir
if is_common_static_content_folder(dir):
static_content_folders.append(dir)
for file in files:
fullname = os.path.join(root, file)
# If the filename matches the ignore list, ignore the file
if full_ignore_pattern is not None and re.match(full_ignore_pattern, fullname.replace("\\", "/")):
continue
# Progress TypeScript
if language == "TypeScript" and (
(fullname.endswith(".mts") or fullname.endswith(".ts"))
and not (fullname.endswith(".spec.mts")
or fullname.endswith(".spec.ts")
or fullname.endswith(".d.ts"))):
# Read the file content
with open(fullname, 'r') as f:
code = f.read()
# Clear TypeScript comments and strings for regex matching
code_cleared = blank_quoted_js_strings(
blank_js_comments(code))
code_files.append({
"full_file_name": fullname,
"relative_file_name": fullname[len(path):],
"file_name": os.path.basename(fullname),
"last_modification": datetime.datetime.fromtimestamp(
pathlib.Path(fullname).stat().st_mtime, tz=datetime.timezone.utc).strftime("%F %T.%f")[:-3],
"code": code,
"code_cleared": code_cleared,
})
return code_files, build_folder, static_content_folders
def get_folder_mrs_script_definitions(path, ignore_list, language, send_gui_message=None):
code_files, build_folder, static_content_folders = get_code_files_from_folder(
path=path, ignore_list=ignore_list, language=language)
mrs_script_def = get_mrs_script_definitions_from_code_file_list(
code_files, language, send_gui_message=send_gui_message)
if build_folder is not None:
mrs_script_def["build_folder"] = build_folder
if len(static_content_folders) > 0:
mrs_script_def["static_content_folders"] = static_content_folders
if language == "TypeScript" and build_folder is None:
mrs_script_def["errors"].append({
"kind": "BuildError",
"message": f"No build folder found for this TypeScript project. Please build the project before adding it.",
})
return mrs_script_def
def get_mrs_script_property(properties, name, default=None):
if not properties:
return default
for property in properties:
if property.get("name") == name:
return property.get("value")
return default
def print_gui_message(type, msg):
print(f"{type.upper()}: {msg}")
def map_ts_type_to_database_type(type: str):
match type.lower():
case "string":
return "text"
case "number":
return "decimal"
case "boolean":
return "bit(1)"
return "json"
def map_ts_type_to_interface(type: str):
match type.lower():
case "string" | "number" | "boolean":
return None
return type
def update_scripts_from_content_set(session, content_set_id, language, content_dir=None, ignore_list=None,
send_gui_message=None):
if send_gui_message is None:
send_gui_message = print_gui_message
content_set = get_content_set(
session=session, content_set_id=content_set_id)
if content_set is None:
raise ValueError(
"Could load the MRS scripts. The given content set was not found.")
service = services.get_service(
session=session, service_id=content_set["service_id"])
if service is None:
raise ValueError(
"Could load the MRS scripts. The content set's service was not found.")
code_files = []
static_content_folders = []
if content_dir is not None:
code_files, build_folder, static_content_folders = get_code_files_from_folder(
path=content_dir, ignore_list=ignore_list, language=language)
else:
files = content_files.get_content_files(
session=session, content_set_id=content_set_id,
include_enable_state=True, include_file_content=True)
for content_file in files:
dirs = pathlib.Path(content_file["request_path"])
if len(dirs.parts) > 0:
if is_common_build_folder(dirs.parts[0]):
build_folder = dirs.parts[0]
elif len(dirs.parts) > 1 and is_common_build_folder(dirs.parts[1]):
build_folder = dirs.parts[1]
if is_common_static_content_folder(dirs.parts[0]):
static_content_folders.append(dirs.parts[0])
elif len(dirs.parts) > 1 and is_common_static_content_folder(dirs.parts[1]):
static_content_folders.append(dirs.parts[1])
fullname = "." + \
content_file["content_set_request_path"] + \
content_file["request_path"]
# Progress TypeScript
if language == "TypeScript" and (
(fullname.endswith(".mts") or fullname.endswith(".ts"))
and not (fullname.endswith(".spec.mts")
or fullname.endswith(".spec.ts")
or fullname.endswith(".d.ts"))):
if core.is_text(content_file["content"]):
code = content_file["content"].decode()
else:
raise ValueError(f"The content of file {
fullname} is binary data, not text.")
# Clear TypeScript comments and strings for regex matching
code_cleared = blank_quoted_js_strings(
blank_js_comments(code))
options = content_file.get("options")
last_modification = ""
if options is not None:
last_modification = options.get("last_modification", "")
code_files.append({
"full_file_name": fullname,
"relative_file_name": fullname[len("." + content_file["content_set_request_path"]):],
"file_name": os.path.basename(fullname),
"last_modification": last_modification,
"code": code,
"code_cleared": code_cleared,
})
if language == "TypeScript" and build_folder is None:
raise Exception(
"The projects build directory was not found. Please build the project before adding the content set.")
if len(code_files) == 0:
if send_gui_message is not None:
send_gui_message(
"info", f"None of the files matches the specified MRS scripting language.")
return
if send_gui_message is not None:
send_gui_message(
"info", f"Parsing {len(code_files)} MRS Script files ...")
script_def = get_mrs_script_definitions_from_code_file_list(
code_files, language=language, send_gui_message=send_gui_message)
if build_folder:
script_def["build_folder"] = build_folder
if len(static_content_folders) > 0:
script_def["static_content_folders"] = static_content_folders
if language == "TypeScript" and build_folder is None:
script_def["errors"].append({
"kind": "BuildError",
"message": f"No build folder found for this TypeScript project. Please build the project before adding it.",
})
error_count = len(script_def["errors"])
if error_count > 0:
return script_def
script_module_files = []
for script_module in script_def["script_modules"]:
properties = script_module["properties"]
# Add script_module_files information for this module
outputFilePath = get_mrs_script_property(properties, "outputFilePath")
# If an explicit outputFilePath has been given, ensure that / are used and the path starts with /
if outputFilePath is not None:
file_to_load = outputFilePath
if file_to_load.count("\\") > 0 and file_to_load.count("/") == 0:
file_to_load = file_to_load.replace("\\", "/")
if not file_to_load.startswith("/"):
file_to_load = "/" + file_to_load
else:
# Otherwise, use /<build_folder>/<file_name>
file_to_load = "/" + build_folder + "/" + \
script_module["file_info"]["file_name"]
if language == "TypeScript":
file_to_load = file_to_load.replace(
".mts", ".mjs").replace(".ts", ".js")
script_module_files.append({
"file_info": script_module["file_info"],
"file_to_load": file_to_load,
"class_name": script_module["class_name"],
})
request_path = get_mrs_script_property(
properties, "requestPath", "/" + script_module["class_name"])
schema = schemas.get_schema(
session=session, request_path=request_path)
if schema is None:
name = get_mrs_script_property(
properties, "name", core.convert_path_to_camel_case(request_path))
send_gui_message(
"info", f"Creating new REST schema `{name}` at {request_path} ...")
# Add the grants for the given module to the options
options = get_mrs_script_property(properties, "options", None)
grants = get_mrs_script_property(properties, "grants", None)
if grants is not None:
if options is None:
options = {}
options["grants"] = grants
schema_id = schemas.add_schema(
session=session, schema_name=name,
service_id=content_set["service_id"], request_path=request_path,
enabled=get_mrs_script_property(properties, "enabled", True),
internal=get_mrs_script_property(properties, "internal", True),
requires_auth=get_mrs_script_property(
properties, "requiresAuth", False),
options=options,
metadata=get_mrs_script_property(properties, "metadata", None),
comments=get_mrs_script_property(properties, "comments", None),
schema_type="SCRIPT_MODULE",
)
schema = schemas.get_schema(session=session, schema_id=schema_id)
send_gui_message(
"info", f"Adding MRS scripts to REST schema {schema.get("name")} at {request_path} ...")
# Add a db_object for each script
interface_list = script_def["interfaces"]
for script in script_module["scripts"]:
func_props = script["properties"]
func_request_path = get_mrs_script_property(
func_props, "requestPath", "/" + script["function_name"])
full_path = service["url_context_root"] + \
schema["request_path"] + func_request_path
row_ownership_param = get_mrs_script_property(
func_props, "rowOwnershipParameter")
row_ownership_field_id = None
send_gui_message(
"info", f"Adding MRS script {script["function_name"]} at {func_request_path} ...")
db_object_id = core.get_sequence_id(session=session)
objects = []
# Build parameters object with all parameter as object_fields
object_id = core.get_sequence_id(session=session)
object_fields = []
pos = 0
for param in script["parameters"]:
object_field_id = core.get_sequence_id(session=session)
if param["name"] == row_ownership_param:
row_ownership_field_id = object_field_id
object_field = {
"id": object_field_id,
"object_id": object_id,
"name": param["name"],
"position": pos,
"db_column": {
"name": param["name"],
"not_null": not param["optional"],
"in": True,
"datatype": map_ts_type_to_database_type(param["type"]),
"is_array": param["is_array"],
},
"enabled": True,
"allow_filtering": True,
"allow_sorting": False,
"no_check": False,
"no_update": False,
}
if param.get("default", None) is not None:
object_field["db_column"]["default"] = param["default"]
# Store interface name
type_interface = map_ts_type_to_interface(param["type"])
if type_interface is not None:
object_field["db_column"]["interface"] = type_interface
object_fields.append(object_field)
pos += 1
obj = {
"id": object_id,
"db_object_id": db_object_id,
"name": core.convert_path_to_pascal_case(full_path) + "Params",
"kind": "PARAMETERS",
"position": 0,
"fields": object_fields,
}
if row_ownership_field_id is not None:
obj["row_ownership_field_id"] = row_ownership_field_id
objects.append(obj)
# Build result object
object_id = core.get_sequence_id(session=session)
object_fields = []
returns_array = False
return_type = script["return_type"]["type"]
returns_array = script["return_type"]["is_array"]
if is_simple_typescript_type(return_type):
object_field = {
"id": core.get_sequence_id(session=session),
"object_id": object_id,
"name": "result",
"position": 0,
"db_column": {
"name": "result",
"not_null": True,
"datatype": map_ts_type_to_database_type(return_type),
"is_array": returns_array,
},
"enabled": True,
"allow_filtering": True,
"allow_sorting": False,
"no_check": False,
"no_update": False,
}
# Store interface name
type_interface = map_ts_type_to_interface(return_type)
if type_interface is not None:
object_field["db_column"]["interface"] = type_interface
object_fields.append(object_field)
else:
add_object_fields_from_interface(
session=session, interface_name=return_type,
interface_list=interface_list,
object_id=object_id, object_fields=object_fields)
objects.append({
"id": object_id,
"db_object_id": db_object_id,
"name": core.convert_path_to_pascal_case(full_path) + "Result",
"kind": "RESULT",
"position": 1,
"fields": object_fields,
"sdk_options": {
"language_options": [
{
"language": "TypeScript",
"class_name": return_type,
}
],
"class_name": return_type,
"returns_array": returns_array,
}
})
# Add the grants for the given endpoint to the options
options = get_mrs_script_property(func_props, "options", None)
grants = get_mrs_script_property(func_props, "grants", None)
if grants is not None:
if options is None:
options = {}
options["grants"] = grants
db_object_id, grants = db_objects.add_db_object(
session=session,
schema_id=schema["id"],
db_object_id=db_object_id,
db_object_name=get_mrs_script_property(
func_props, "name", script["function_name"]),
request_path=func_request_path,
db_object_type="SCRIPT",
enabled=get_mrs_script_property(func_props, "enabled", True),
internal=get_mrs_script_property(func_props, "internal", True),
requires_auth=get_mrs_script_property(
func_props, "requiresAuth", True),
options=options,
metadata=get_mrs_script_property(func_props, "metadata", None),
comments=get_mrs_script_property(func_props, "comments", None),
crud_operation_format=get_mrs_script_property(
func_props, "format", "FEED"),
media_type=get_mrs_script_property(
func_props, "mediaType", None),
items_per_page=None,
auto_detect_media_type=None,
auth_stored_procedure=None,
objects=objects,
)
core.insert(table="content_set_has_obj_def", values={
"content_set_id": content_set_id,
"db_object_id": db_object_id,
"kind": "Script",
"priority": 0,
"language": language,
"name": script["function_name"],
"class_name": script_module["class_name"],
"options": {
"file_to_load": file_to_load
}
}).exec(session)
for grant in grants:
core.MrsDbExec(grant).exec(session)
for trigger in script_module["triggers"]:
print(f"{trigger["function_name"]=}")
# for interface in script_def["interfaces"]:
# print(f"{interface["name"]=}")
# Add the list of MRS script files to load, as well as the script definition
options = content_set["options"]
options["script_module_files"] = script_module_files
options["script_definitions"] = script_def
core.update(
table="content_set",
sets={"options": options},
where=["id=?"]
).exec(session, [content_set_id])
# Update content files, make all files private that are not in static folders
where = []
params = [content_set_id]
for folder in static_content_folders:
if not folder.startswith("/"):
folder = "/" + folder
folder += "%"
where.append(f"request_path LIKE ?")
params.append(folder)
core.update(
table="content_file",
sets={"enabled": 2},
where=["content_set_id=?",
"NOT (" + " OR ".join(where) + ")"]
).exec(session, params)
return script_def
def get_extended_interface_properties(interface, interface_list):
if interface.get("extends") is None or interface["extends"] == "":
return interface["properties"]
parent_interface = get_typescript_interface_from_list(
type_name=interface["extends"], interface_list=interface_list)
props = interface["properties"].copy()
props.extend(get_extended_interface_properties(
parent_interface, interface_list))
return props
def add_object_fields_from_interface(
session, interface_name, interface_list, object_id, object_fields: list, parent_reference_id=None):
interface = get_typescript_interface_from_list(
type_name=interface_name, interface_list=interface_list)
interface_properties = get_extended_interface_properties(
interface=interface,
interface_list=interface_list)
for field in interface_properties:
field_type = field["type"]
field_array = False
if field_type.endswith("[]"):
field_type = field_type[:-2]
field_array = True
object_field = {
"id": core.get_sequence_id(session=session),
"object_id": object_id,
"name": field["name"],
"position": len(object_fields),
"db_column": {
"name": field["name"],
"not_null": not field["optional"],
"datatype": map_ts_type_to_database_type(field_type),
"is_array": field_array,
"read_only": field["readOnly"],
},
"enabled": True,
"allow_filtering": True,
"allow_sorting": False,
"no_check": False,
"no_update": False,
}
if parent_reference_id is not None:
object_field["parent_reference_id"] = parent_reference_id
# Store interface name
type_interface = map_ts_type_to_interface(field_type)
if type_interface is not None:
object_field["db_column"]["interface"] = type_interface
if is_simple_typescript_type(field_type):
object_fields.append(object_field)
else:
object_ref_id = core.get_sequence_id(session=session)
object_ref = {
"id": object_ref_id,
"unnest": False,
"reference_mapping": {
"kind": "1:n" if field_array else "1:1",
"constraint": "interface",
"referenced_schema": field_type,
"referenced_table": "",
"column_mapping": [{"base": "n/a", "ref": "n/a"}]
},
"sdk_options": {
"language_options": [
{
"language": "TypeScript",
"class_name": field_type,
}
],
"class_name": field_type,
}
}
object_field["represents_reference_id"] = object_ref_id
object_field["object_reference"] = object_ref
# No recursive call to add other interfaces
object_fields.append(object_field)
def get_content_set_create_statement(session, content_set: dict, allow_load_scripts: bool) -> str:
stmt = []
stmt.append(
f"CREATE OR REPLACE REST CONTENT SET {content_set.get('request_path')}\n"
+ f" ON SERVICE {content_set.get('host_ctx')}"
)
if content_set["enabled"] == 2:
stmt.append(" PRIVATE")
elif content_set["enabled"] is False or content_set["enabled"] == 0:
stmt.append(" DISABLED")
if content_set["comments"]: # ignore either None or empty
stmt.append(f" COMMENT {core.squote_str(content_set["comments"])}")
stmt.append(core.format_json_entry("OPTIONS", content_set.get("options")))
stmt.append(" AUTHENTICATION REQUIRED" if content_set["requires_auth"] in [True, 1] \
else " AUTHENTICATION NOT REQUIRED")
if allow_load_scripts and content_set["content_type"] == "SCRIPTS":
if content_set["options"]["mrs_scripting_language"] == "TypeScript":
stmt.append(" LOAD TYPESCRIPT SCRIPTS")
else:
stmt.append(" LOAD SCRIPTS")
output = ["\n".join(stmt) + ";"]
content_set_files = content_files.get_content_files(
session, content_set["id"], None, False)
for service_content_file in content_set_files:
output.append(content_files.get_content_file_create_statement(
session, service_content_file))
return "\n\n".join(output)
def update_file_content_via_regex(file_path, reg_ex, substitute):
# Open file and get content
with open(file_path, 'r') as f:
content = f.read()
# Get new content
new_content = re.sub(reg_ex, substitute, content, 0, re.MULTILINE)
# Save to file
with open(file_path, 'w') as f:
f.write(new_content)
def prepare_open_api_ui(service, request_path, send_gui_message=None) -> str:
# Get tempdir
temp_dir = tempfile.gettempdir()
# Download SwaggerUI
if send_gui_message is not None:
send_gui_message("info", "Downloading OpenAPI UI package ...")
ssl._create_default_https_context = ssl._create_unverified_context
swagger_ui_zip_path = os.path.join(temp_dir, "swagger-ui.zip")
with urlopen(OPENAPI_UI_URL) as zip_web_file:
with open(swagger_ui_zip_path, 'wb') as zip_disk_file:
zip_disk_file.write(zip_web_file.read())
# Download Dark CSS
if send_gui_message is not None:
send_gui_message("info", "Downloading dark CSS file ...")
swagger_ui_dark_css_path = os.path.join(temp_dir, "swagger-ui-dark.css")
with urlopen(OPENAPI_DARK_CSS_URL) as dark_css_web_file:
with open(swagger_ui_dark_css_path, 'wb') as dark_css_disk_file:
dark_css_disk_file.write(dark_css_web_file.read())
# Extract zip
if send_gui_message is not None:
send_gui_message("info", "Extracting package ...")
swagger_ui_path = os.path.join(temp_dir, "swagger-ui")
if not os.path.exists(swagger_ui_path):
os.makedirs(swagger_ui_path)
else:
shutil.rmtree(swagger_ui_path)
os.makedirs(swagger_ui_path)
with zipfile.ZipFile(swagger_ui_zip_path, 'r') as zip_ref:
for zip_info in zip_ref.infolist():
if (not zip_info.is_dir()
and os.sep + "dist" + os.sep in zip_info.filename
and not os.path.basename(zip_info.filename).startswith(".")
and not os.path.basename(zip_info.filename).endswith(".map")
and not "-es-" in os.path.basename(zip_info.filename)):
zip_info.filename = os.path.basename(zip_info.filename)
zip_ref.extract(zip_info, swagger_ui_path)
# Delete zip file
pathlib.Path.unlink(swagger_ui_zip_path)
# Update the config file
if send_gui_message is not None:
send_gui_message("info", "Customizing files ...")
update_file_content_via_regex(
os.path.join(swagger_ui_path, "swagger-initializer.js"),
r"(url: \".*?\")",
f'url: "{service["url_context_root"]}/open-api-catalog/"')
# Add dark css
with open(swagger_ui_dark_css_path, 'r') as file1:
with open(os.path.join(swagger_ui_path, "index.css"), 'a') as file2:
shutil.copyfileobj(file1, file2)
file2.write("\n#swagger-ui {\npadding-top: 20px;\n}\n")
# Authorize Btn
file2.write("\n.swagger-ui .scheme-container {\nposition: absolute;\nright: 0px;\ntop: 0px;\n"
"height: 58px;\npadding: 10px 0px;\nbackground: unset;\nbox-shadow: unset;\n}\n")
file2.write(
"\n.swagger-ui .btn.authorize span {\npadding: 4px 10px 0 0;\n}\n")
# Explore Area
file2.write("\n.swagger-ui .topbar {\nbackground-color: unset;\npadding: 10px 0;\n"
"position: absolute;\ntop: 0px;\nright: 170px;\nwidth: 300px;\nfont-size: 0.75em;\n}\n")
file2.write("\n.swagger-ui .topbar .download-url-wrapper .download-url-button {\n"
"padding: 3px 10px 0px 10px;\nfont-size: 1.2em;\n}\n")
file2.write("\n.swagger-ui .topbar .wrapper {\npadding: 0;\n}\n")
# Delete dark css file
pathlib.Path.unlink(swagger_ui_dark_css_path)
# Replace the favicon
favicon_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "docs", "images")
shutil.copyfile(
os.path.join(favicon_path, "favicon-16x16.png"),
os.path.join(swagger_ui_path, "favicon-16x16.png"))
shutil.copyfile(
os.path.join(favicon_path, "favicon-32x32.png"),
os.path.join(swagger_ui_path, "favicon-32x32.png"))
# Change Title
update_file_content_via_regex(
os.path.join(swagger_ui_path, "index.html"),
r'Swagger UI',
f'{service["name"]} - OpenAPI UI')
# Patch UI to redirect to MRS authentication
redirect_url = f'/?service={service["url_context_root"]
}&redirectUrl={service["url_context_root"]+request_path}/index.html'
authorize_btn_on_click = f'()=>{{window.location.href="{redirect_url}";}}'
update_file_content_via_regex(
os.path.join(swagger_ui_path, "swagger-ui-bundle.js"),
r'("btn authorize unlocked",onClick:i})',
f'"btn authorize unlocked",onClick:{authorize_btn_on_click}}}')
# cspell:ignore topbar
update_file_content_via_regex(
os.path.join(swagger_ui_path, "swagger-ui.css"),
r"(\.swagger-ui \.topbar a{align-items:center;color:#fff;display:flex;)",
'.swagger-ui .topbar a{align-items:center;color:#fff;display:none;')
return swagger_ui_path