in mrs_plugin/lib/content_sets.py [0:0]
def update_scripts_from_content_set(session, content_set_id, language, content_dir=None, ignore_list=None,
send_gui_message=None):
if send_gui_message is None:
send_gui_message = print_gui_message
content_set = get_content_set(
session=session, content_set_id=content_set_id)
if content_set is None:
raise ValueError(
"Could load the MRS scripts. The given content set was not found.")
service = services.get_service(
session=session, service_id=content_set["service_id"])
if service is None:
raise ValueError(
"Could load the MRS scripts. The content set's service was not found.")
code_files = []
static_content_folders = []
if content_dir is not None:
code_files, build_folder, static_content_folders = get_code_files_from_folder(
path=content_dir, ignore_list=ignore_list, language=language)
else:
files = content_files.get_content_files(
session=session, content_set_id=content_set_id,
include_enable_state=True, include_file_content=True)
for content_file in files:
dirs = pathlib.Path(content_file["request_path"])
if len(dirs.parts) > 0:
if is_common_build_folder(dirs.parts[0]):
build_folder = dirs.parts[0]
elif len(dirs.parts) > 1 and is_common_build_folder(dirs.parts[1]):
build_folder = dirs.parts[1]
if is_common_static_content_folder(dirs.parts[0]):
static_content_folders.append(dirs.parts[0])
elif len(dirs.parts) > 1 and is_common_static_content_folder(dirs.parts[1]):
static_content_folders.append(dirs.parts[1])
fullname = "." + \
content_file["content_set_request_path"] + \
content_file["request_path"]
# Progress TypeScript
if language == "TypeScript" and (
(fullname.endswith(".mts") or fullname.endswith(".ts"))
and not (fullname.endswith(".spec.mts")
or fullname.endswith(".spec.ts")
or fullname.endswith(".d.ts"))):
if core.is_text(content_file["content"]):
code = content_file["content"].decode()
else:
raise ValueError(f"The content of file {
fullname} is binary data, not text.")
# Clear TypeScript comments and strings for regex matching
code_cleared = blank_quoted_js_strings(
blank_js_comments(code))
options = content_file.get("options")
last_modification = ""
if options is not None:
last_modification = options.get("last_modification", "")
code_files.append({
"full_file_name": fullname,
"relative_file_name": fullname[len("." + content_file["content_set_request_path"]):],
"file_name": os.path.basename(fullname),
"last_modification": last_modification,
"code": code,
"code_cleared": code_cleared,
})
if language == "TypeScript" and build_folder is None:
raise Exception(
"The projects build directory was not found. Please build the project before adding the content set.")
if len(code_files) == 0:
if send_gui_message is not None:
send_gui_message(
"info", f"None of the files matches the specified MRS scripting language.")
return
if send_gui_message is not None:
send_gui_message(
"info", f"Parsing {len(code_files)} MRS Script files ...")
script_def = get_mrs_script_definitions_from_code_file_list(
code_files, language=language, send_gui_message=send_gui_message)
if build_folder:
script_def["build_folder"] = build_folder
if len(static_content_folders) > 0:
script_def["static_content_folders"] = static_content_folders
if language == "TypeScript" and build_folder is None:
script_def["errors"].append({
"kind": "BuildError",
"message": f"No build folder found for this TypeScript project. Please build the project before adding it.",
})
error_count = len(script_def["errors"])
if error_count > 0:
return script_def
script_module_files = []
for script_module in script_def["script_modules"]:
properties = script_module["properties"]
# Add script_module_files information for this module
outputFilePath = get_mrs_script_property(properties, "outputFilePath")
# If an explicit outputFilePath has been given, ensure that / are used and the path starts with /
if outputFilePath is not None:
file_to_load = outputFilePath
if file_to_load.count("\\") > 0 and file_to_load.count("/") == 0:
file_to_load = file_to_load.replace("\\", "/")
if not file_to_load.startswith("/"):
file_to_load = "/" + file_to_load
else:
# Otherwise, use /<build_folder>/<file_name>
file_to_load = "/" + build_folder + "/" + \
script_module["file_info"]["file_name"]
if language == "TypeScript":
file_to_load = file_to_load.replace(
".mts", ".mjs").replace(".ts", ".js")
script_module_files.append({
"file_info": script_module["file_info"],
"file_to_load": file_to_load,
"class_name": script_module["class_name"],
})
request_path = get_mrs_script_property(
properties, "requestPath", "/" + script_module["class_name"])
schema = schemas.get_schema(
session=session, request_path=request_path)
if schema is None:
name = get_mrs_script_property(
properties, "name", core.convert_path_to_camel_case(request_path))
send_gui_message(
"info", f"Creating new REST schema `{name}` at {request_path} ...")
# Add the grants for the given module to the options
options = get_mrs_script_property(properties, "options", None)
grants = get_mrs_script_property(properties, "grants", None)
if grants is not None:
if options is None:
options = {}
options["grants"] = grants
schema_id = schemas.add_schema(
session=session, schema_name=name,
service_id=content_set["service_id"], request_path=request_path,
enabled=get_mrs_script_property(properties, "enabled", True),
internal=get_mrs_script_property(properties, "internal", True),
requires_auth=get_mrs_script_property(
properties, "requiresAuth", False),
options=options,
metadata=get_mrs_script_property(properties, "metadata", None),
comments=get_mrs_script_property(properties, "comments", None),
schema_type="SCRIPT_MODULE",
)
schema = schemas.get_schema(session=session, schema_id=schema_id)
send_gui_message(
"info", f"Adding MRS scripts to REST schema {schema.get("name")} at {request_path} ...")
# Add a db_object for each script
interface_list = script_def["interfaces"]
for script in script_module["scripts"]:
func_props = script["properties"]
func_request_path = get_mrs_script_property(
func_props, "requestPath", "/" + script["function_name"])
full_path = service["url_context_root"] + \
schema["request_path"] + func_request_path
row_ownership_param = get_mrs_script_property(
func_props, "rowOwnershipParameter")
row_ownership_field_id = None
send_gui_message(
"info", f"Adding MRS script {script["function_name"]} at {func_request_path} ...")
db_object_id = core.get_sequence_id(session=session)
objects = []
# Build parameters object with all parameter as object_fields
object_id = core.get_sequence_id(session=session)
object_fields = []
pos = 0
for param in script["parameters"]:
object_field_id = core.get_sequence_id(session=session)
if param["name"] == row_ownership_param:
row_ownership_field_id = object_field_id
object_field = {
"id": object_field_id,
"object_id": object_id,
"name": param["name"],
"position": pos,
"db_column": {
"name": param["name"],
"not_null": not param["optional"],
"in": True,
"datatype": map_ts_type_to_database_type(param["type"]),
"is_array": param["is_array"],
},
"enabled": True,
"allow_filtering": True,
"allow_sorting": False,
"no_check": False,
"no_update": False,
}
if param.get("default", None) is not None:
object_field["db_column"]["default"] = param["default"]
# Store interface name
type_interface = map_ts_type_to_interface(param["type"])
if type_interface is not None:
object_field["db_column"]["interface"] = type_interface
object_fields.append(object_field)
pos += 1
obj = {
"id": object_id,
"db_object_id": db_object_id,
"name": core.convert_path_to_pascal_case(full_path) + "Params",
"kind": "PARAMETERS",
"position": 0,
"fields": object_fields,
}
if row_ownership_field_id is not None:
obj["row_ownership_field_id"] = row_ownership_field_id
objects.append(obj)
# Build result object
object_id = core.get_sequence_id(session=session)
object_fields = []
returns_array = False
return_type = script["return_type"]["type"]
returns_array = script["return_type"]["is_array"]
if is_simple_typescript_type(return_type):
object_field = {
"id": core.get_sequence_id(session=session),
"object_id": object_id,
"name": "result",
"position": 0,
"db_column": {
"name": "result",
"not_null": True,
"datatype": map_ts_type_to_database_type(return_type),
"is_array": returns_array,
},
"enabled": True,
"allow_filtering": True,
"allow_sorting": False,
"no_check": False,
"no_update": False,
}
# Store interface name
type_interface = map_ts_type_to_interface(return_type)
if type_interface is not None:
object_field["db_column"]["interface"] = type_interface
object_fields.append(object_field)
else:
add_object_fields_from_interface(
session=session, interface_name=return_type,
interface_list=interface_list,
object_id=object_id, object_fields=object_fields)
objects.append({
"id": object_id,
"db_object_id": db_object_id,
"name": core.convert_path_to_pascal_case(full_path) + "Result",
"kind": "RESULT",
"position": 1,
"fields": object_fields,
"sdk_options": {
"language_options": [
{
"language": "TypeScript",
"class_name": return_type,
}
],
"class_name": return_type,
"returns_array": returns_array,
}
})
# Add the grants for the given endpoint to the options
options = get_mrs_script_property(func_props, "options", None)
grants = get_mrs_script_property(func_props, "grants", None)
if grants is not None:
if options is None:
options = {}
options["grants"] = grants
db_object_id, grants = db_objects.add_db_object(
session=session,
schema_id=schema["id"],
db_object_id=db_object_id,
db_object_name=get_mrs_script_property(
func_props, "name", script["function_name"]),
request_path=func_request_path,
db_object_type="SCRIPT",
enabled=get_mrs_script_property(func_props, "enabled", True),
internal=get_mrs_script_property(func_props, "internal", True),
requires_auth=get_mrs_script_property(
func_props, "requiresAuth", True),
options=options,
metadata=get_mrs_script_property(func_props, "metadata", None),
comments=get_mrs_script_property(func_props, "comments", None),
crud_operation_format=get_mrs_script_property(
func_props, "format", "FEED"),
media_type=get_mrs_script_property(
func_props, "mediaType", None),
items_per_page=None,
auto_detect_media_type=None,
auth_stored_procedure=None,
objects=objects,
)
core.insert(table="content_set_has_obj_def", values={
"content_set_id": content_set_id,
"db_object_id": db_object_id,
"kind": "Script",
"priority": 0,
"language": language,
"name": script["function_name"],
"class_name": script_module["class_name"],
"options": {
"file_to_load": file_to_load
}
}).exec(session)
for grant in grants:
core.MrsDbExec(grant).exec(session)
for trigger in script_module["triggers"]:
print(f"{trigger["function_name"]=}")
# for interface in script_def["interfaces"]:
# print(f"{interface["name"]=}")
# Add the list of MRS script files to load, as well as the script definition
options = content_set["options"]
options["script_module_files"] = script_module_files
options["script_definitions"] = script_def
core.update(
table="content_set",
sets={"options": options},
where=["id=?"]
).exec(session, [content_set_id])
# Update content files, make all files private that are not in static folders
where = []
params = [content_set_id]
for folder in static_content_folders:
if not folder.startswith("/"):
folder = "/" + folder
folder += "%"
where.append(f"request_path LIKE ?")
params.append(folder)
core.update(
table="content_file",
sets={"enabled": 2},
where=["content_set_id=?",
"NOT (" + " OR ".join(where) + ")"]
).exec(session, params)
return script_def