mrs_plugin/lib/MrsDdlExecutor.py (2,665 lines of code) (raw):

# Copyright (c) 2023, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import mrs_plugin.lib as lib from mrs_plugin.lib.MrsDdlExecutorInterface import MrsDdlExecutorInterface import json import re from mysqlsh import globals, DBError from datetime import datetime import base64 import os class Timer(object): def __init__(self) -> None: self.start = datetime.now() def elapsed(self): return (datetime.now() - self.start).total_seconds() class MrsDdlExecutor(MrsDdlExecutorInterface): def __init__( self, session, current_service_id=None, current_service=None, current_service_host=None, current_in_development=None, current_schema_id=None, current_schema=None, state_data=None ): self.session = session # state_data will contain shared state data that must persist across calls if state_data is None: state_data = {} self.state_data = state_data self.service_state_data_checked = False self.schema_state_data_checked = False self.results = [] # Updates the values of the provided parameters only if they are not None, # this is done to avoid overriding values that are cached in the Session if current_service_id is not None: self.current_service_id = current_service_id if current_service is not None: self.current_service = current_service if current_in_development is not None: self.current_in_development = current_in_development if current_service_host is not None: self.current_service_host = current_service_host if current_schema_id is not None: self.current_schema_id = current_schema_id if current_schema is not None: self.current_schema = current_schema self.current_operation = None @property def current_service_id(self): self.check_current_service() return self.state_data.get("current_service_id") @current_service_id.setter def current_service_id(self, value): self.state_data["current_service_id"] = value @property def current_schema_id(self): self.check_current_schema() return self.state_data.get("current_schema_id") @current_schema_id.setter def current_schema_id(self, value): self.state_data["current_schema_id"] = value @property def current_service(self): self.check_current_service() return self.state_data.get("current_service") @current_service.setter def current_service(self, value): self.state_data["current_service"] = value @property def current_service_host(self): self.check_current_service() return self.state_data.get("current_service_host") @current_service_host.setter def current_service_host(self, value): self.state_data["current_service_host"] = value @property def current_in_development(self): self.check_current_service() return self.state_data.get("current_in_development") @current_in_development.setter def current_in_development(self, value): self.state_data["current_in_development"] = value @property def current_schema(self): self.check_current_schema() return self.state_data.get("current_schema") @current_schema.setter def current_schema(self, value): self.state_data["current_schema"] = value def check_current_service(self): if self.service_state_data_checked: return self.service_state_data_checked = True id = self.state_data.get("current_service_id") if id and not lib.services.get_service(self.session, id): self.current_service_id = None self.current_service = None self.current_service_host = None self.current_in_development = None self.current_schema = None self.current_schema_id = None def check_current_schema(self): if self.schema_state_data_checked: return self.schema_state_data_checked = True id = self.state_data.get("current_schema_id") if id and not lib.schemas.get_schema(self.session, id): self.current_schema = None self.current_schema_id = None # Check if the current mrs_object includes a services request_path or if a # current service has been set via USE REST SERVICE def get_given_or_current_service_id(self, mrs_object, allow_not_set=False): service_id, _ = self.get_given_or_current_service_id_and_path( mrs_object) if service_id is None and not allow_not_set: raise Exception("No REST SERVICE specified.") return service_id # Check if the current mrs_object includes a services request_path or if a # current service has been set via USE REST SERVICE def get_given_or_current_service_id_and_path(self, mrs_object): # Prefer the given service if specified service_id = None service_path = None # Prefer the given service if specified url_context_root = mrs_object.get("url_context_root") if url_context_root is not None: url_host_name = mrs_object.get("url_host_name", "") developer_list = ( mrs_object.get("in_development").get("developers", []) if mrs_object.get("in_development") else None ) service = lib.services.get_service( url_context_root=url_context_root, url_host_name=url_host_name, developer_list=developer_list, session=self.session, get_default=False, ) if service is None: raise Exception( f"Could not find the REST SERVICE {self.get_service_sorted_developers(developer_list)}" + f"{url_host_name}{url_context_root}." ) service_id = service.get("id") service_path = service.get("url_context_root") if service_path is None and self.current_service_id is not None: service_id = self.current_service_id service_path = self.current_service mrs_object["url_context_root"] = self.current_service mrs_object["url_host_name"] = self.current_service_host mrs_object["in_development"] = self.current_in_development return service_id, service_path def get_given_or_current_full_service_path(self, mrs_object): # Prefer the given service if specified url_context_root = mrs_object.get("url_context_root") url_host_name = mrs_object.get("url_host_name", "") if url_context_root is None: if self.current_service_id is None: raise Exception("No REST SERVICE specified.") url_context_root = self.current_service url_host_name = self.current_service_host or "" return url_host_name + url_context_root def get_service_sorted_developers(self, developer_list: list): sorted_developers = "" if developer_list is not None and len(developer_list) > 0: def quote(s): return f"'{s}'" developer_list.sort() sorted_developers = ( ",".join( ( quote(re.sub(r"(['\\])", "\\\\\\1", dev, 0, re.MULTILINE)) if not re.match(r"^\w+$", dev) else dev ) for dev in developer_list ) + "@" ) return sorted_developers # Check if the current mrs_object includes a schema request_path or if a # current schema has been set via USE REST SCHEMA def get_given_or_current_schema_id(self, mrs_object, allow_not_set=False): schema_id = None schema_request_path = mrs_object.get("schema_request_path") if schema_request_path is None and self.current_schema_id is not None: schema_id = self.current_schema_id mrs_object["schema_request_path"] = self.current_schema mrs_object["url_context_root"] = self.current_service mrs_object["url_host_name"] = self.current_service_host mrs_object["in_development"] = self.current_in_development elif schema_request_path is not None: service_id = self.get_given_or_current_service_id(mrs_object) schema = lib.schemas.get_schema( service_id=service_id, request_path=schema_request_path, session=self.session, ) if schema is None: full_path = ( mrs_object.get("url_host_name", "") + mrs_object.get("url_context_root", "") + schema_request_path ) raise Exception(f"Could not find the REST SCHEMA {full_path}.") schema_id = schema.get("id") if schema_id is None and not allow_not_set: raise Exception("No REST SCHEMA specified.") return schema_id def getFullServicePath(self, mrs_object, request_path=""): if "url_context_root" not in mrs_object: developers = "" if self.current_in_development is not None: developers = self.get_service_sorted_developers( self.current_in_development.get("developers", "") ) if self.current_service_host: return developers + self.current_service_host + request_path else: if self.current_service is not None: return "" + self.current_service + request_path else: return "" + request_path developers = "" if mrs_object.get("in_development") is not None: developers = self.get_service_sorted_developers( mrs_object.get("in_development").get("developers", "") ) return ( developers + mrs_object.get("url_host_name", "") + mrs_object.get("url_context_root", "") + request_path ) def getFullSchemaPath(self, mrs_object, request_path=""): developers = "" if "in_development" in mrs_object.keys(): developers = self.get_service_sorted_developers( mrs_object.get("in_development").get("developers") ) return ( developers + mrs_object.get("url_host_name", self.current_service_host if self.current_service_host is not None else "") + mrs_object.get("url_context_root", self.current_service if self.current_service is not None else "") + mrs_object.get("schema_request_path", self.current_schema if self.current_schema is not None else "") + request_path ) def createRestMetadata(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: status = lib.general.configure( session=self.session, enable_mrs=mrs_object.get("enabled"), options=mrs_object.get("options"), update_if_available=mrs_object.get("update_if_available"), merge_options=mrs_object.get("merge_options", False), ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": ( f"REST metadata configured successfully." if (status.get("schema_changed", False) is True) else f"REST Metadata updated successfully." ), "operation": self.current_operation, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to configure the REST metadata. {e}", "operation": self.current_operation, } ) raise def createRestService(self, mrs_object): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") context_root = mrs_object.get("url_context_root", "") url_host_name = mrs_object.pop("url_host_name", "") line = mrs_object.pop("line", None) add_auth_apps = mrs_object.pop("add_auth_apps", []) # No need to remove auth apps during creation mrs_object.pop("remove_auth_apps", []) mrs_object.pop("merge_options", False) full_path = self.getFullServicePath(mrs_object=mrs_object) with lib.core.MrsDbTransaction(self.session): try: # If the OR REPLACE was specified, check if there is an existing service on the same host # with the same path and delete it. if do_replace is True: service = lib.services.get_service( url_context_root=context_root, url_host_name=url_host_name, get_default=False, developer_list=( mrs_object.get("in_development").get( "developers", []) if "in_development" in mrs_object.keys() else None ), session=self.session, ) if service is not None: lib.services.delete_service( service_id=service.get("id"), session=self.session ) # Add the service service_id = lib.services.add_service( session=self.session, url_host_name=url_host_name, service=mrs_object, ) # TODO this code doesn't belong here, it should be moved to the GUI layer # If this is the first service, make it the current one services = lib.services.get_services(session=self.session) if len(services) == 1: # Set the stored current session lib.services.set_current_service_id( session=self.session, service_id=self.current_service_id ) for auth_app_name in add_auth_apps: auth_app = lib.auth_apps.get_auth_app( session=self.session, name=auth_app_name) if auth_app is None: raise ValueError( f"The given REST authentication app `{auth_app}` was not found.") lib.auth_apps.link_auth_app( session=self.session, auth_app_id=auth_app["id"], service_id=service_id) self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "success", "message": f"REST SERVICE `{full_path}` created successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(service_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "error", "message": f"Failed to create the REST SERVICE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestSchema(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") schema_request_path = mrs_object.get("schema_request_path") full_path = self.getFullSchemaPath(mrs_object=mrs_object) with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) # If the OR REPLACE was specified, check if there is an existing schema on the same service # and delete it. if do_replace == True: schema = lib.schemas.get_schema( service_id=service_id, request_path=mrs_object.get("schema_request_path"), session=self.session, ) if schema is not None: lib.schemas.delete_schema( schema_id=schema.get("id"), session=self.session ) schema_id = lib.schemas.add_schema( schema_name=mrs_object.get("schema_name"), service_id=service_id, request_path=schema_request_path, requires_auth=mrs_object.get("requires_auth", False), enabled=mrs_object.get("enabled", 1), items_per_page=mrs_object.get("items_per_page"), comments=mrs_object.get("comments"), options=mrs_object.get("options"), session=self.session, ) # If this is the first schema of the REST service, make it the current one schemas = lib.schemas.get_schemas( session=self.session, service_id=service_id ) if len(schemas) == 1: self.current_schema_id = schema_id self.current_schema = schema_request_path self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST SCHEMA `{full_path}` created successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(schema_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST SCHEMA `{full_path}`. {e}", "operation": self.current_operation, } ) raise def fill_object_names_if_not_given(self, mrs_object: dict, schema_id, full_path): assigned_names = [] for i, obj in enumerate(mrs_object.get("objects")): # Auto-create unique object name if it was not specified by the user if obj["name"] is None: obj["name"] = lib.core.convert_path_to_pascal_case(full_path) numeric_post_fix = 2 # If this db_object represents a procedure, add Params to the first object name and 2,3,.. if more than # one result set object has been defined if ( mrs_object.get("db_object_type") == "PROCEDURE" or mrs_object.get("db_object_type") == "FUNCTION" ): if obj["kind"] == "PARAMETERS": obj["name"] = obj["name"] + "Params" if i > 1: if numeric_post_fix < i: numeric_post_fix = i obj["name"] = lib.core.convert_path_to_pascal_case( full_path + str(numeric_post_fix) ) # If the object name is not unique for this schema, keep increasing a numeric_post_fix name_unique = False while not name_unique: try: if obj["name"] in assigned_names: raise Exception("Object name already used.") lib.core.check_mrs_object_name( session=self.session, db_schema_id=schema_id, obj_id=obj["id"], obj_name=obj["name"], ) name_unique = True assigned_names.append(obj["name"]) except: obj["name"] = lib.core.convert_path_to_pascal_case( full_path + str(numeric_post_fix) ) numeric_post_fix += 1 def createRestDbObject(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") full_path = self.getFullSchemaPath( mrs_object=mrs_object, request_path=mrs_object.get("request_path") ) db_object_type = mrs_object.get("db_object_type") type_caption = ( "VIEW" if db_object_type == "TABLE" or db_object_type == "VIEW" else db_object_type ) with lib.core.MrsDbTransaction(self.session): try: schema_id = self.get_given_or_current_schema_id(mrs_object) self.fill_object_names_if_not_given( mrs_object=mrs_object, schema_id=schema_id, full_path=full_path ) # If the OR REPLACE was specified, check if there is an existing db_object on the same schema # and delete it. if do_replace is True: db_object = lib.db_objects.get_db_object( schema_id=schema_id, request_path=mrs_object.get("request_path"), session=self.session, ) if db_object is not None: lib.db_objects.delete_db_object( db_object_id=db_object.get("id"), session=self.session ) db_object_id, grants = lib.db_objects.add_db_object( session=self.session, schema_id=lib.core.id_to_binary(schema_id, "schema_id"), db_object_name=mrs_object.get("name"), request_path=mrs_object.get("request_path"), db_object_type=mrs_object.get("db_object_type"), enabled=mrs_object.get("enabled", True), items_per_page=mrs_object.get("items_per_page"), requires_auth=mrs_object.get("requires_auth", 1), crud_operation_format=mrs_object.get("format", "FEED"), comments=mrs_object.get("comments"), media_type=mrs_object.get("media_type"), auto_detect_media_type=mrs_object.get( "media_type_autodetect", False ), auth_stored_procedure=mrs_object.get( "auth_stored_procedure"), options=mrs_object.get("options"), db_object_id=lib.core.id_to_binary( mrs_object.get("id"), "db_object_id" ), objects=mrs_object.get("objects"), metadata=mrs_object.get("metadata", None), ) warnings = [] for grant in grants: try: lib.core.MrsDbExec(grant).exec(self.session) except DBError as e: if mrs_object.get("force_create", False): warnings.append({ "level": "warning", "message": e.msg, "code": e.code}) else: raise self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST {type_caption} `{full_path}` created successfully.", "operation": self.current_operation, "id": db_object_id, "executionTime": timer.elapsed(), "warnings": warnings } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST {type_caption} `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestContentSet(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=mrs_object.get("request_path") ) with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) # If the OR REPLACE was specified, check if there is an existing content set on the same service # and delete it. if do_replace == True: content_set = lib.content_sets.get_content_set( service_id=service_id, request_path=mrs_object.get("request_path"), session=self.session, ) if content_set is not None: lib.content_sets.delete_content_set( content_set_ids=[content_set.get("id")], session=self.session, ) # Check if scripts should be loaded options = mrs_object.get("options", None) if mrs_object["content_type"] == "SCRIPTS": if options is None: options = {} options["contains_mrs_scripts"] = True if mrs_object.get("language", None) is not None: options["mrs_scripting_language"] = mrs_object["language"] content_set_id, files_added = lib.content_sets.add_content_set( session=self.session, service_id=service_id, request_path=mrs_object.get("request_path"), requires_auth=mrs_object.get("requires_auth", True), comments=mrs_object.get("comments"), options=options, enabled=mrs_object.get("enabled", 1), content_dir=mrs_object.get("directory_file_path"), ignore_list=mrs_object.get("file_ignore_list", None), ) self.results.append( { "statementIndex": len(self.results) + 1, "type": "success", "message": f"REST content set `{full_path}` created successfully. {files_added} file(s) added.", "operation": self.current_operation, "id": content_set_id, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST CONTENT SET `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestContentFile(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") full_path = mrs_object.get("request_path") with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) # Get content set content_set = lib.content_sets.get_content_set( session=self.session, service_id=service_id, request_path=mrs_object.get("content_set_path"), ) if content_set is None: raise Exception( f'CONTENT SET {mrs_object.get("content_set_path")} not found.' ) full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=content_set.get("request_path") + mrs_object.get("request_path"), ) # If the OR REPLACE was specified, check if there is an existing content set on the same service # and delete it. if do_replace == True: content_file = lib.content_files.get_content_file( content_set_id=content_set.get("id"), request_path=mrs_object.get("request_path"), include_file_content=False, session=self.session, ) if content_file is not None: lib.content_files.delete_content_file( content_file_ids=[content_file.get("id")], session=self.session, ) file_path: str | None = mrs_object.get("directory_file_path") if file_path is not None: file_path = file_path.replace('"', "") if not os.path.exists(file_path): raise Exception(f"File '{file_path}' does not exist.") with open(file_path, "rb") as f: data = f.read() elif mrs_object.get("is_binary", False): data = base64.b64decode(mrs_object.get("content")) else: data = mrs_object.get("content") content_file_id = lib.content_files.add_content_file( session=self.session, content_set_id=content_set.get("id"), request_path=mrs_object.get("request_path"), requires_auth=mrs_object.get("requires_auth", 1), options=mrs_object.get("options"), data=data, enabled=mrs_object.get("enabled", 1), ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST CONTENT FILE `{full_path}` created successfully.", "operation": self.current_operation, "id": content_file_id, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST CONTENT FILE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestAuthApp(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") name = mrs_object.get("name") full_path = name with lib.core.MrsDbTransaction(self.session): try: # If the OR REPLACE was specified, check if there is an existing content set on the same service # and delete it. if do_replace == True: auth_app = lib.auth_apps.get_auth_app( name=name, session=self.session ) if auth_app is not None: lib.auth_apps.delete_auth_app( app_id=auth_app.get("id"), session=self.session, ) # TODO: Lookup default role by name if mrs_object.get("default_role"): role = lib.roles.get_role( session=self.session, caption=mrs_object.get("default_role"), ) if role is None: raise Exception( f'Given role "{mrs_object.get("default_role")}" not found.' ) default_role_id = role.get("id") else: default_role_id = lib.core.id_to_binary( "0x31000000000000000000000000000000", "" ) auth_vendor = lib.auth_apps.get_auth_vendor( session=self.session, name=mrs_object.get("vendor") ) if auth_vendor is None: raise Exception( f'The vendor `{mrs_object.get("vendor")}` was not found.' ) # Check constraints for OAuth2 vender apps if (auth_vendor["id"] != lib.core.id_to_binary("0x30000000000000000000000000000000", "") and auth_vendor["id"] != lib.core.id_to_binary("0x31000000000000000000000000000000", "")): if mrs_object.get("url") is None: raise Exception( f'The OAuth2 vendor `{mrs_object.get("vendor")}` requires ' 'the URL option to be specified.' ) if mrs_object.get("app_id") is None: raise Exception( f'The OAuth2 vendor `{mrs_object.get("vendor")}` requires ' 'the APP/CLIENT ID option to be specified.' ) if mrs_object.get("app_secret") is None: raise Exception( f'The OAuth2 vendor `{mrs_object.get("vendor")}` requires ' 'the APP/CLIENT SECRET option to be specified.' ) auth_app_id = lib.auth_apps.add_auth_app( session=self.session, service_id=None, auth_vendor_id=auth_vendor["id"], app_name=name, description=mrs_object.get("comments"), url=mrs_object.get("url"), url_direct_auth=mrs_object.get("url_direct_auth"), access_token=mrs_object.get("app_secret"), app_id=mrs_object.get("app_id"), limit_to_reg_users=mrs_object.get( "limit_to_registered_users", 1), default_role_id=default_role_id, enabled=mrs_object.get("enabled", 1), ) if auth_app_id is None: raise Exception("REST auth app could not be created.") self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST AUTH APP `{full_path}` created successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(auth_app_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST CONTENT SET `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestUser(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") name = mrs_object.get("name") authAppName = mrs_object.get("authAppName") password = mrs_object.get("password") full_path = f':"{name}"@"{authAppName}"' app_options = mrs_object.get("app_options", None) options = mrs_object.get("options", None) email = options.pop("email", None) if options else None vendor_user_id = options.pop( "vendor_user_id", None) if options else None mapped_user_id = options.pop( "mapped_user_id", None) if options else None login_permitted = mrs_object.get("login_permitted", True) with lib.core.MrsDbTransaction(self.session): try: if password == "": raise Exception("The password must not be empty.") auth_app = lib.auth_apps.get_auth_app( name=authAppName, session=self.session ) if auth_app is None: raise Exception( f"The given REST AUTH APP for {full_path} was not found." ) # If the OR REPLACE was specified, check if there is an existing content set on the same service # and delete it. if do_replace == True: users = lib.users.get_users( auth_app_id=auth_app.get("id"), user_name=name, session=self.session, ) if len(users) > 0: lib.users.delete_user_by_id( user_id=users[0].get("id"), session=self.session ) user_id = lib.users.add_user( session=self.session, auth_app_id=auth_app["id"], name=name, email=email, vendor_user_id=vendor_user_id, login_permitted=int(login_permitted), mapped_user_id=mapped_user_id, options=options, app_options=app_options, auth_string=password, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST USER `{full_path}` created successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(user_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST USER `{full_path}`. {e}", "operation": self.current_operation, } ) raise def createRestRole(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") do_replace = mrs_object.pop("do_replace") # captions are UNIQUE caption = mrs_object.get("name") comments = mrs_object.get("comments") any_service = mrs_object.get("any_service", False) json_options = mrs_object.get("options", {}) if not any_service: specific_to_service_id = self.get_given_or_current_service_id( mrs_object, allow_not_set=False) else: specific_to_service_id = None with lib.core.MrsDbTransaction(self.session): try: extends = mrs_object.get("extends") if extends: parent_role = lib.roles.get_role( self.session, caption=extends) if not parent_role: raise Exception(f"Invalid parent role '{extends}'") extends_role_id = parent_role.get("id") else: extends_role_id = None # If the OR REPLACE was specified, check if there is an existing content set on the same service # and delete it. if do_replace == True: role = lib.roles.get_role( specific_to_service_id=specific_to_service_id, caption=caption, session=self.session, ) if role: lib.roles.delete_role( role_id=role.get("id"), session=self.session ) role_id = lib.roles.add_role( session=self.session, derived_from_role_id=extends_role_id, caption=caption, specific_to_service_id=specific_to_service_id, description=comments, options=json_options, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST ROLE `{caption}` created successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(role_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST ROLE `{caption}`. {e}", "operation": self.current_operation, } ) raise def cloneRestService(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") full_path = self.getFullServicePath(mrs_object) try: service_id = self.get_given_or_current_service_id(mrs_object) service = lib.services.get_service( service_id=service_id, session=self.session ) if service is None: raise Exception("The given REST SERVICE was not found.") # A new developer_list / url_context_root / url_host_name is defined for the clone if ( "new_developer_list" in mrs_object.keys() and len(mrs_object["new_developer_list"]) > 0 ): if not "in_development" in service.keys(): service["in_development"] = {"developers": []} service["in_development"]["developers"] = mrs_object.pop( "new_developer_list" ) else: service["in_development"] = None service["url_context_root"] = mrs_object.pop( "new_url_context_root", "/undefined" ) new_url_host_name = mrs_object.pop("new_url_host_name", "") service["parent_id"] = service_id service.pop("id", None) service.pop("url_host_name", None) service.pop("host_ctx", None) service.pop("full_service_path", None) service.pop("is_current", None) service.pop("sorted_developers", None) service.pop("auth_apps", None) service.pop("merge_options", None) # Add the service service_id = lib.services.add_service( session=self.session, url_host_name=new_url_host_name, service=service ) # TODO: Making the correct entry into service_has_auth_app, using the same auth_apps_ids as the parent # TODO: Add copying of schemas and db_objects and content_sets self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(service_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to clone the REST SERVICE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def alterRestService(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") add_auth_apps = mrs_object.pop("add_auth_apps", []) remove_auth_apps = mrs_object.pop("remove_auth_apps", []) merge_options = mrs_object.pop("merge_options", False) full_path = self.getFullServicePath(mrs_object) try: line = mrs_object.pop("line", None) service_id = self.get_given_or_current_service_id(mrs_object) # If a new developer_list / url_context_root / url_host_name was given, # overwrite the existing values in the mrs_object # so they are set during the update if "new_developer_list" in mrs_object.keys(): if not "in_development" in mrs_object.keys(): mrs_object["in_development"] = {"developers": []} mrs_object["in_development"]["developers"] = mrs_object.pop( "new_developer_list" ) new_url_context_root = mrs_object.pop("new_url_context_root", None) new_url_host_name = mrs_object.pop("new_url_host_name", None) if new_url_context_root is not None: if new_url_host_name is None: new_url_host_name = "" mrs_object["url_context_root"] = new_url_context_root mrs_object["url_host_name"] = new_url_host_name lib.services.update_services( session=self.session, service_ids=[ service_id], value=mrs_object, merge_options=merge_options, ) for auth_app_name in add_auth_apps: auth_app = lib.auth_apps.get_auth_app( session=self.session, name=auth_app_name) if auth_app is None: raise ValueError( f"The given REST authentication app `{auth_app}` was not found.") lib.auth_apps.link_auth_app( session=self.session, auth_app_id=auth_app["id"], service_id=service_id) for auth_app_name in remove_auth_apps: auth_app = lib.auth_apps.get_auth_app( session=self.session, name=auth_app_name) if auth_app is None: raise ValueError( f"The given REST authentication app `{auth_app}` was not found.") lib.auth_apps.unlink_auth_app( session=self.session, auth_app_id=auth_app["id"], service_id=service_id) self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "success", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(service_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "error", "message": f"Failed to update the REST SERVICE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def alterRestSchema(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") full_path = self.getFullSchemaPath(mrs_object=mrs_object) merge_options = mrs_object.pop("merge_options", False) try: service_id = self.get_given_or_current_service_id(mrs_object) schema = lib.schemas.get_schema( service_id=service_id, request_path=mrs_object.get("schema_request_path"), session=self.session, ) if schema is None: raise Exception("The REST schema was not found.") lib.schemas.update_schema( session=self.session, schemas={schema["id"]: ""}, value={ "service_id": mrs_object.get( "new_service_id", schema["service_id"] ), "request_path": mrs_object.get( "new_request_path", schema["request_path"] ), "requires_auth": int(mrs_object.get( "requires_auth", schema["requires_auth"] )), "enabled": mrs_object.get("enabled", schema["enabled"]), "items_per_page": mrs_object.get( "items_per_page", schema["items_per_page"] ), "comments": mrs_object.get("comments", schema["comments"]), "options": mrs_object.get("options", schema["options"]), "metadata": mrs_object.get("metadata", schema["metadata"]), }, merge_options=merge_options, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(service_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to update the REST SCHEMA `{full_path}`. {e}", "operation": self.current_operation, } ) raise def alterRestDbObject(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") request_path = mrs_object.pop("request_path") rest_object_type = mrs_object.pop("type") db_object_id = mrs_object.pop("id") full_path = self.getFullSchemaPath( mrs_object=mrs_object, request_path=request_path ) # Remove non-standard mrs_object keys so it can be directly passed to the alter function mrs_object.pop("url_host_name", "") mrs_object.pop("parent_reference_stack", "") mrs_object.pop("url_context_root", "") mrs_object.pop("schema_request_path", "") mrs_object.pop("in_development", "") line = mrs_object.pop("line", None) merge_options = mrs_object.pop("merge_options", False) try: db_object = lib.db_objects.get_db_object( session=self.session, db_object_id=db_object_id ) if db_object is None: raise Exception( f"The given REST {rest_object_type} `{full_path}` could not be found." ) self.fill_object_names_if_not_given( mrs_object=mrs_object, schema_id=db_object["db_schema_id"], full_path=full_path, ) new_request_path = mrs_object.pop("new_request_path") if new_request_path is not None: mrs_object["request_path"] = new_request_path new_object_name = mrs_object.pop("new_object_name") if new_object_name is not None: # TODO: Implement object handling pass mrs_object.pop("graph_ql_object_count", None) objects = mrs_object.pop("objects", None) # Alter DB Object if mrs_object: lib.db_objects.update_db_objects( session=self.session, db_object_ids={db_object["id"]: ""}, value=mrs_object, merge_options=merge_options, ) if objects: lib.db_objects.set_objects( session=self.session, db_object_id=db_object["id"], objects=objects ) self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "success", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(db_object["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": line, "type": "error", "message": f"Failed to get the REST {rest_object_type} `{full_path}`. {e}", "operation": self.current_operation, } ) raise def alterRestContentSet(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") request_path = mrs_object.pop("request_path") merge_options = mrs_object.pop("merge_options", False) full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=request_path ) with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) if service_id is None: raise Exception("No REST SERVICE specified.") content_set = lib.content_sets.get_content_set( session=self.session, service_id=service_id, request_path=request_path, ) if content_set is None: raise Exception( f"The given REST content set `{full_path}` could not be found." ) new_request_path = mrs_object.pop("new_request_path") if new_request_path is not None: mrs_object["request_path"] = new_request_path # Check if scripts should be loaded options = mrs_object.get("options", None) if mrs_object["content_type"] == "SCRIPTS": if options is None: options = {} options["contains_mrs_scripts"] = True if mrs_object.get("language", None) is not None: options["mrs_scripting_language"] = mrs_object.pop( "language") mrs_object["options"] = options if "url_context_root" in mrs_object: mrs_object.pop("url_context_root") file_ignore_list = mrs_object.pop("file_ignore_list", None) lib.content_sets.update_content_set( session=self.session, content_set_id=content_set["id"], value=mrs_object, file_ignore_list=file_ignore_list, merge_options=merge_options, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(content_set["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to update the REST content set `{full_path}`. {e}", "operation": self.current_operation, } ) raise def alterRestUser(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") name = mrs_object.get("name") authAppName = mrs_object.get("authAppName") password = mrs_object.get("password") full_path = f':"{name}"@"{authAppName}"' app_options = mrs_object.get("app_options", None) options = mrs_object.get("options", None) email = options.pop("email", None) if options else None vendor_user_id = options.pop( "vendor_user_id", None) if options else None mapped_user_id = options.pop( "mapped_user_id", None) if options else None login_permitted = mrs_object.get("login_permitted", None) with lib.core.MrsDbTransaction(self.session): try: if password == "": raise Exception("The password must not be empty.") auth_app = lib.auth_apps.get_auth_app( name=authAppName, session=self.session ) if auth_app is None: raise Exception( f"The given REST AUTH APP for {full_path} was not found." ) user = lib.users.get_user( self.session, user_name=name, auth_app_id=auth_app["id"] ) if not user: raise Exception( f'Invalid REST user "{name}"@"{authAppName}"') user_id = user['id'] changes = {} if login_permitted is not None: changes["login_permitted"] = int(login_permitted) if email is not None: changes["email"] = email if password is not None: changes["auth_string"] = password changes["auth_app_id"] = user["auth_app_id"] if app_options is not None: changes["app_options"] = app_options if mapped_user_id is not None: changes["mapped_user_id"] = mapped_user_id if vendor_user_id is not None: changes["vendor_user_id"] = vendor_user_id lib.users.update_user( session=self.session, user_id=user_id, value=changes ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST USER `{full_path}` updated successfully.", "affectedItemsCount": 1, "operation": self.current_operation, "id": lib.core.convert_id_to_string(user_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to update the REST USER `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestService(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") full_path = self.getFullServicePath(mrs_object=mrs_object) with lib.core.MrsDbTransaction(self.session): try: service = lib.services.get_service( url_context_root=mrs_object.get("url_context_root"), url_host_name=mrs_object.get("url_host_name", ""), developer_list=( mrs_object.get("in_development").get("developers", []) if "in_development" in mrs_object.keys() else None ), session=self.session, ) if service is None: raise Exception( f"The given REST SERVICE `{full_path}` could not be found." ) lib.services.delete_service( service_id=service["id"], session=self.session ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST SERVICE `{full_path}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(service["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST SERVICE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestSchema(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=mrs_object.get( "request_path", "") ) with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) schema = lib.schemas.get_schema( service_id=service_id, request_path=mrs_object["request_path"], session=self.session, ) if schema is None: raise Exception( f"The given REST SCHEMA `{full_path}` could not be found." ) lib.schemas.delete_schema( schema_id=schema["id"], session=self.session) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST SCHEMA `{full_path}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(schema["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST SCHEMA `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestDbObject(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") request_path = mrs_object.pop("request_path") rest_object_type = mrs_object.pop("type") full_path = self.getFullSchemaPath( mrs_object=mrs_object, request_path=request_path ) with lib.core.MrsDbTransaction(self.session): try: schema_id = self.get_given_or_current_schema_id(mrs_object) db_object = lib.db_objects.get_db_object( session=self.session, schema_id=schema_id, request_path=request_path ) if db_object is None: raise Exception( f"The given REST {rest_object_type} `{full_path}` could not be found." ) lib.db_objects.delete_db_object( session=self.session, db_object_id=db_object.get("id") ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST {rest_object_type} `{full_path}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(db_object["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST {rest_object_type} `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestContentSet(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") request_path = mrs_object.get("request_path") full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=request_path ) with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) content_set = lib.content_sets.get_content_set( service_id=service_id, request_path=request_path, session=self.session, ) if content_set is None: raise Exception( f"The given REST CONTENT SET `{full_path}` could not be found." ) lib.content_sets.delete_content_set( content_set_ids=[content_set["id"]], session=self.session ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST CONTENT SET `{full_path}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(content_set["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST CONTENT SET `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestContentFile(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") full_path = mrs_object.get("request_path") with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) # Get content set content_set = lib.content_sets.get_content_set( session=self.session, service_id=service_id, request_path=mrs_object.get("content_set_path"), ) if content_set is None: raise Exception( f'The REST content set {mrs_object.get("content_set_path")} was not found.' ) full_path = self.getFullServicePath( mrs_object=mrs_object, request_path=content_set.get("request_path") + mrs_object.get("request_path"), ) content_file = lib.content_files.get_content_file( content_set_id=content_set.get("id"), request_path=mrs_object.get("request_path"), include_file_content=False, session=self.session, ) if content_file is None: raise Exception( f"The REST content file {full_path} was not found.") lib.content_files.delete_content_file( content_file_id=content_file["id"], session=self.session ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST CONTENT FILE `{full_path}` created successfully.", "operation": self.current_operation, "id": content_file["id"], "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to create the REST CONTENT FILE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestAuthApp(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") name = mrs_object.get("name") with lib.core.MrsDbTransaction(self.session): try: auth_app = lib.auth_apps.get_auth_app( name=name, session=self.session ) if auth_app is None: raise Exception( f"The given REST AUTH APP `{name}` could not be found." ) lib.auth_apps.delete_auth_app( app_id=auth_app.get("id"), session=self.session, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST AUTH APP `{name}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(auth_app["id"]), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST AUTH APP `{name}`. {e}", "operation": self.current_operation, } ) raise def dropRestUser(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") name = mrs_object.get("name") authAppName = mrs_object.get("authAppName") full_path = f':"{name}"@"{authAppName}"' with lib.core.MrsDbTransaction(self.session): try: service_id = self.get_given_or_current_service_id(mrs_object) auth_app = lib.auth_apps.get_auth_app( name=authAppName, session=self.session ) if auth_app is None: raise Exception( f"The given REST AUTH APP for {full_path} was not found." ) users = lib.users.get_users( auth_app_id=auth_app.get("id"), user_name=name, session=self.session ) if len(users) > 0: lib.users.delete_user_by_id( user_id=users[0].get("id"), session=self.session ) else: raise Exception("User was not found.") self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST USER `{full_path}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(users[0].get("id")), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST USER `{full_path}`. {e}", "operation": self.current_operation, } ) raise def dropRestRole(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") caption = mrs_object.get("name") with lib.core.MrsDbTransaction(self.session): try: role = lib.roles.get_role( caption=caption, session=self.session) if role: lib.roles.delete_role( role_id=role.get("id"), session=self.session) else: raise Exception(f"Role `{caption}` was not found.") self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REST ROLE `{caption}` dropped successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(role.get("id")), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to drop the REST ROLE `{caption}`. {e}", "operation": self.current_operation, } ) raise def grantRestPrivileges(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") role_name = mrs_object.get("role") privileges = mrs_object.get("privileges") schema_request_path = mrs_object.get("schema_request_path") if schema_request_path is None: schema_request_path = "*" object_request_path = mrs_object.get("object_request_path") if object_request_path is None: object_request_path = "*" if schema_request_path not in ("*", "") and not schema_request_path.startswith("/"): raise ValueError('schema_path must be "", "*" or start with a /') if object_request_path not in ("*", "") and not object_request_path.startswith("/"): raise ValueError('object_path must be "", "*" or start with a /') with lib.core.MrsDbTransaction(self.session): try: role = lib.roles.get_role( caption=role_name, session=self.session) if not role: raise Exception(f"Role `{role_name}` was not found.") full_service_path = self.get_given_or_current_full_service_path( mrs_object ) priv_id = lib.roles.add_role_privilege( session=self.session, role_id=role.get("id"), privileges=privileges, service_path=full_service_path, schema_path=schema_request_path, object_path=object_request_path ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"GRANT to `{role_name}` added successfully.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(priv_id), "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to grant privileges for REST role `{role_name}`. {e}", "operation": self.current_operation, } ) raise def grantRestRole(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") role_name = mrs_object.get("role") user_name = mrs_object.get("user") auth_app_name = mrs_object.get("auth_app_name") comments = mrs_object.get("comments") target = f"`{user_name}`@`{auth_app_name}`" with lib.core.MrsDbTransaction(self.session): try: role = lib.roles.get_role( caption=role_name, session=self.session, ) if not role: raise Exception(f"Role `{role_name}` was not found.") user = lib.users.get_user( self.session, user_name=user_name, auth_app_name=auth_app_name, ) if not user: raise Exception( f'User "{user_name}"@"{auth_app_name}" was not found.' ) lib.users.add_user_role( session=self.session, role_id=role.get("id"), user_id=user.get("id"), comments=comments, ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"GRANT ROLE to {target} added successfully.", "operation": self.current_operation, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to grant REST role `{role_name}` to {target}. {e}", "operation": self.current_operation, } ) raise def revokeRestRole(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") role_name = mrs_object.get("role") user_name = mrs_object.get("user") auth_app_name = mrs_object.get("auth_app_name") target = f"`{user_name}`@`{auth_app_name}`" with lib.core.MrsDbTransaction(self.session): try: role = lib.roles.get_role( caption=role_name, session=self.session) if not role: raise Exception(f"Role `{role_name}` was not found.") user = lib.users.get_user( self.session, user_name=user_name, auth_app_name=auth_app_name, ) if user is None: raise Exception( f"The given user `{user_name}` was not found.") lib.users.delete_user_roles( session=self.session, user_id=user.get("id"), role_id=role.get("id"), ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REVOKE ROLE from {target} executed successfully.", "operation": self.current_operation, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to REVOKE REST role `{role_name}` from {target}. {e}", "operation": self.current_operation, } ) raise def revokeRestPrivilege(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") role_name = mrs_object.get("role") privileges = mrs_object.get("privileges") schema_request_path = mrs_object.get("schema_request_path") if schema_request_path is None: schema_request_path = "*" object_request_path = mrs_object.get("object_request_path") if object_request_path is None: object_request_path = "*" full_service_path = self.get_given_or_current_full_service_path( mrs_object ) with lib.core.MrsDbTransaction(self.session): try: role = lib.roles.get_role( caption=role_name, session=self.session) if not role: raise Exception(f"Role `{role_name}` was not found.") result = lib.roles.delete_role_privilege( session=self.session, role_id=role.get("id"), privileges=privileges, service_path=full_service_path, schema_path=schema_request_path, object_path=object_request_path, ) if not result: raise Exception( f"There is no such grant for role {role_name}") self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"REVOKE from `{role_name}` executed successfully.", "operation": self.current_operation, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to revoke privileges for REST role `{role_name}`. {e}", "operation": self.current_operation, } ) raise def use(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: url_context_root = mrs_object.get("url_context_root") if url_context_root is not None: service = lib.services.get_service( url_context_root=url_context_root, url_host_name=mrs_object.get("url_host_name", ""), developer_list=( mrs_object.get("in_development").get("developers", []) if "in_development" in mrs_object.keys() else None ), session=self.session, ) if service is not None: self.current_service_id = service.get("id") self.current_service = service.get("url_context_root") self.current_service_host = mrs_object.get( "url_host_name", "") self.current_in_development = service.get("in_development") else: raise Exception( f"A REST SERVICE with the request path {url_context_root} could not be found." ) developers = ( self.get_service_sorted_developers( self.current_in_development.get("developers") ) if self.current_in_development is not None else "" ) if mrs_object.get("schema_request_path") is not None: if self.current_service_id is None: raise Exception(f"No current REST SERVICE specified.") request_path = mrs_object.get("schema_request_path") schema = lib.schemas.get_schema( service_id=self.current_service_id, request_path=request_path, session=self.session, ) if schema is not None: self.current_schema_id = schema.get("id") self.current_schema = schema.get("request_path") else: raise Exception( f"A REST SCHEMA with the request path {request_path} could not be found." ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": ( f"Now using REST SCHEMA `{self.current_schema}` " f"on REST SERVICE `{developers}{self.current_service_host}{self.current_service}`." ), "operation": self.current_operation, "id": lib.core.convert_id_to_string(self.current_schema_id), "executionTime": timer.elapsed(), } ) else: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f"Now using REST SERVICE `{developers}{self.current_service_host}{self.current_service}`.", "operation": self.current_operation, "id": lib.core.convert_id_to_string(self.current_service_id), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot USE the specified REST object. {e}", "operation": self.current_operation, } ) raise def showRestMetadataStatus(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: result = [lib.general.get_status(session=self.session)] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST services. {e}", "operation": self.current_operation, } ) raise def showRestServices(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: services = lib.services.get_services(session=self.session) result = [] for service in services: result.append( { "REST SERVICE Path": service.get("full_service_path"), "enabled": lib.core.get_enabled_status_caption(service.get("enabled")), "current": "YES" if (service.get("id") == self.current_service_id) else "NO", "auth_apps": service.get("auth_apps", "") } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST services. {e}", "operation": self.current_operation, } ) raise def showRestSchemas(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: service_id = self.get_given_or_current_service_id(mrs_object) schemas = lib.schemas.get_schemas( session=self.session, service_id=service_id ) result = [] for schema in schemas: result.append( { "REST schema path": schema.get("request_path"), "enabled": lib.core.get_enabled_status_caption(schema.get("enabled")), } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST schemas. {e}", "operation": self.current_operation, } ) raise def showRestDbObjects(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") object_types = mrs_object.pop("object_types") try: schema_id = self.get_given_or_current_schema_id(mrs_object) items = lib.db_objects.get_db_objects( session=self.session, schema_id=schema_id, object_types=object_types ) result = [] for item in items: result.append( { "REST DB Object": item.get("request_path"), "enabled": lib.core.get_enabled_status_caption(item.get("enabled")), } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST db objects. {e}", "operation": self.current_operation, } ) raise def showRestContentSets(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: service_id = self.get_given_or_current_service_id(mrs_object) content_sets = lib.content_sets.get_content_sets( session=self.session, service_id=service_id ) result = [] for content_set in content_sets: result.append( { "REST CONTENT SET path": content_set.get("request_path"), "enabled": lib.core.get_enabled_status_caption(content_set.get("enabled")), } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f'OK, {len(content_sets)} record{"s" if len(content_sets) > 1 else ""} received.', "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST CONTENT SETs. {e}", "operation": self.current_operation, } ) raise def showRestContentFiles(self, mrs_object: dict): raise NotImplementedError() def showRestAuthApps(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") try: service_id = self.get_given_or_current_service_id(mrs_object) auth_apps = lib.auth_apps.get_auth_apps( session=self.session, service_id=service_id ) result = [] for auth_app in auth_apps: result.append( { "REST AUTH APP name": auth_app.get("name"), "vendor": auth_app.get("auth_vendor"), "comments": auth_app.get("description"), "enabled": lib.core.get_enabled_status_caption(auth_app.get("enabled")), } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f'OK, {len(auth_apps)} record{"s" if len(auth_apps) > 1 else ""} received.', "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW the REST CONTENT SETs. {e}", "operation": self.current_operation, } ) raise def showRestRoles(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") user_name = mrs_object.get("user_name") auth_app_name = mrs_object.get("auth_app_name") any_service = mrs_object.get("any_service", False) show_users = False show_services = False try: service_id = self.get_given_or_current_service_id( mrs_object, allow_not_set=True) if not service_id: any_service = True if user_name is not None and auth_app_name is not None and not any_service: user = lib.users.get_user( self.session, user_name=user_name, auth_app_name=auth_app_name, service_id=service_id, ) if not user: raise Exception(f"User {lib.core.quote_user(user_name)}@{lib.core.quote_auth_app(auth_app_name)} not found") roles = lib.users.get_user_roles( session=self.session, user_id=user.get("id") ) elif user_name or auth_app_name: show_users = True show_services = True roles = lib.roles.get_granted_roles( session=self.session, specific_to_service_id=service_id if not any_service else None, user_name=user_name, auth_app_name=auth_app_name, include_users=show_users, ) else: show_services = True roles = lib.roles.get_roles( session=self.session, specific_to_service_id=service_id if not any_service else None, include_global=True ) result = [] if user_name is not None and auth_app_name is not None: target = lib.core.quote_user(user_name) + "@" + lib.core.quote_auth_app(auth_app_name) column_names = [f"REST roles for {target}"] for role in roles: result.append( { column_names[0]: role.get("caption"), "comments": role.get("comments") or "", "derived_from_role": role.get("derived_from_role_caption") or "", "description": role.get("description") or "", "options": role.get("options"), } ) if show_services: result[-1]["specific_to_service"] = ( role.get("specific_to_service_request_path") or "" ) else: target = None if user_name: column_names = [f"REST roles for {user_name}"] elif auth_app_name: column_names = [f"REST roles for @{auth_app_name}"] else: column_names = [f"REST role"] for role in roles: result.append( { column_names[0]: role.get("caption"), "derived_from_role": role.get("derived_from_role_caption") or "", "description": role.get("description") or "", "options": role.get("options"), } ) if show_services: result[-1]["specific_to_service"] = ( role.get("specific_to_service_request_path") or "" ) if show_users: result[-1]["users"] = role.get("users") or "" column_names += [ "derived_from_role", "description", "options", ] if show_services: column_names.append("specific_to_service") if target: column_names.append("comments") if show_users: column_names.append("users") self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "message": f'OK, {len(roles)} record{"s" if len(roles) > 1 else ""} received.', "operation": self.current_operation, "result": result, "columns": column_names, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW REST ROLES. {e}", "operation": self.current_operation, } ) raise def showRestGrants(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") role_name = mrs_object.get("role") try: role = lib.roles.get_role(session=self.session, caption=role_name) if not role: raise Exception(f"No such role {role_name}") grants = lib.roles.get_role_privileges( session=self.session, role_id=role["id"] ) result = [] for grant in grants: result.append( { f"REST grants for {role.get('caption')}": lib.roles.format_role_grant_statement( grant ) } ) self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Cannot SHOW REST GRANTs. {e}", "operation": self.current_operation, } ) raise def showRestUser(self, mrs_object: dict): raise NotImplementedError() def formatJsonSetting(self, setting_name, value: dict): if value is None or value == "": return "" js = json.dumps(value, indent=4) # Indent the json.dumps with 4 spaces js_indented = "" for ln in js.split("\n"): js_indented += f" {ln}\n" return f" {setting_name} {js_indented[4:-1]}\n" def showCreateRestService(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") include_database_endpoints = mrs_object.pop("include_database_endpoints", False) full_path = self.getFullServicePath(mrs_object=mrs_object) try: service_id = self.get_given_or_current_service_id(mrs_object) service = lib.services.get_service( session=self.session, service_id=service_id ) result = [{"CREATE REST SERVICE": lib.services.get_service_create_statement(self.session, service, include_database_endpoints=include_database_endpoints, include_static_endpoints=False, include_dynamic_endpoints=False)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(service_id), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST SERVICE `{full_path}`. {e}", "operation": self.current_operation, } ) raise def showCreateRestSchema(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") include_all_objects = mrs_object.pop("include_all_objects", False) full_path = self.getFullSchemaPath(mrs_object=mrs_object) try: service_id = self.get_given_or_current_service_id(mrs_object) service = lib.services.get_service( session=self.session, service_id=service_id ) if service is None: raise Exception( f"The specified REST SERVICE {self.getFullServicePath(mrs_object=mrs_object)} could not be found." ) schema_id = self.get_given_or_current_schema_id(mrs_object) schema = lib.schemas.get_schema( schema_id=schema_id, session=self.session, ) if schema is None: raise Exception( f"The given REST SCHEMA `{full_path}` could not be found." ) if schema is None: raise Exception("The REST schema was not found.") result = [{"CREATE REST SCHEMA ": lib.schemas.get_schema_create_statement(self.session, schema, include_all_objects)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(schema.get("id")), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST SCHEMA `{full_path}`. {e}", "operation": self.current_operation, } ) raise def showCreateRestDbObject(self, mrs_object: dict): timer = Timer() # Keep in sync with the function buildDataMappingViewSql implemented in # ../../frontend/src/modules/mrs/dialogs/MrsObjectFieldEditor.tsx self.current_operation = mrs_object.pop("current_operation") request_path = mrs_object.pop("request_path") rest_object_type = mrs_object.pop("type") full_path = self.getFullSchemaPath( mrs_object=mrs_object, request_path=request_path ) try: schema_id = self.get_given_or_current_schema_id(mrs_object) db_object = lib.db_objects.get_db_object( session=self.session, schema_id=schema_id, request_path=request_path ) if db_object is None: raise Exception( f"The given REST {rest_object_type} `{full_path}` could not be found." ) objects = lib.db_objects.get_objects( session=self.session, db_object_id=db_object.get("id") ) if len(objects) == 0: raise Exception( f"The given REST object `{full_path}` does not have a result definition defined." ) if ( rest_object_type == "PROCEDURE" and db_object.get("object_type") != "PROCEDURE" ): raise Exception( f"The given REST object `{full_path}` is not a REST PROCEDURE." ) if ( rest_object_type == "FUNCTION" and db_object.get("object_type") != "FUNCTION" ): raise Exception( f"The given REST object `{full_path}` is not a REST FUNCTION." ) if ( rest_object_type == "VIEW" and db_object.get("object_type") != "TABLE" and db_object.get("object_type") != "VIEW" ): raise Exception( f"The given REST object `{full_path}` is not a REST VIEW." ) result = [{f"CREATE REST {rest_object_type}": lib.db_objects.get_db_object_create_statement(self.session, db_object, objects)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(db_object["id"]), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST {rest_object_type} `{full_path}`. {e}", "operation": self.current_operation, } ) raise def showCreateRestContentSet(self, mrs_object: dict): timer = Timer() try: result = [{"CREATE REST CONTENT SET": lib.content_sets.get_content_set_create_statement(self.session, mrs_object, False)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(mrs_object["id"]), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f'Failed to get the REST CONTENT SET `{mrs_object.get("request_path")}`. {e}', "operation": self.current_operation, } ) raise def showCreateRestContentFile(self, mrs_object: dict): timer = Timer() try: content_file = lib.content_files.get_content_file( self.session, content_set_id=mrs_object["content_set_id"], request_path=mrs_object["request_path"], include_file_content=True, ) result = [{"CREATE REST CONTENT FILE": lib.content_files.get_content_file_create_statement(self.session, content_file)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(content_file["id"]), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append({ "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f'Failed to get the REST CONTENT SET `{mrs_object.get("request_path")}`. {e}', "operation": self.current_operation, }) raise def showCreateRestAuthApp(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") name = mrs_object.get("name") include_all_objects = mrs_object.pop("include_all_objects", None) try: auth_app = lib.auth_apps.get_auth_app( name=name, session=self.session ) if auth_app is None: raise Exception( f"The given REST AUTH APP `{name}` could not be found." ) result = [{"CREATE REST AUTH APP ": lib.auth_apps.get_auth_app_create_statement(self.session, auth_app, include_all_objects)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(auth_app.get("id")), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST AUTH APP `{name}`. {e}", "operation": self.current_operation, } ) raise def showCreateRestUser(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") user_name = mrs_object.get("user_name") auth_app_name = mrs_object.get("auth_app_name") full_user_name = f'{lib.core.quote_user(user_name)}@{lib.core.quote_auth_app(auth_app_name)}' include_all_objects = mrs_object["include_all_objects"] try: user = lib.users.get_user( self.session, user_id=mrs_object["user_id"], mask_password=False) result = [{"CREATE REST USER ": lib.users.get_user_create_statement(self.session, user, include_all_objects)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(user.get("id")), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST USER `{full_user_name}`. {e}", "operation": self.current_operation, } ) raise def showCreateRestRole(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") caption = mrs_object.get("caption", "") try: result = [{"CREATE REST ROLE ": lib.roles.get_role_create_statement(self.session, mrs_object)}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(mrs_object.get("id")), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to get the REST ROLE `{caption}`. {e}", "operation": self.current_operation, } ) raise def dumpRestService(self, mrs_object: dict): timer = Timer() self.current_operation = mrs_object.pop("current_operation") include_database_endpoints = mrs_object["include_database_endpoints"] include_static_endpoints = mrs_object["include_static_endpoints"] include_dynamic_endpoints = mrs_object["include_dynamic_endpoints"] try: service = lib.services.get_service(self.session, url_context_root=mrs_object["url_context_root"]) lib.services.store_service_create_statement(self.session, service, mrs_object.get("destination_path"), mrs_object.get("zip", False), include_database_endpoints, include_static_endpoints, include_dynamic_endpoints) result = [{"DUMP REST SERVICE ": f"Result stored in '{mrs_object["destination_path"]}'"}] self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "success", "operation": self.current_operation, "id": lib.core.convert_id_to_string(service.get("id")), "result": result, "executionTime": timer.elapsed(), } ) except Exception as e: self.results.append( { "statementIndex": len(self.results) + 1, "line": mrs_object.get("line"), "type": "error", "message": f"Failed to execute DUMP REST SERVICE `{service.get("name", "unknown")}`. {e}", "operation": self.current_operation, } ) raise