mrs_plugin/db_objects.py (502 lines of code) (raw):

# Copyright (c) 2021, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Sub-Module for managing MRS schemas""" # cSpell:ignore mysqlsh, mrs, privs import os from pathlib import Path from mysqlsh.plugin_manager import plugin_function import mrs_plugin.lib as lib from .interactive import resolve_schema, resolve_service, resolve_db_object, resolve_file_path, resolve_overwrite_file def resolve_db_object_ids(db_object_name=None, schema_id=None, service_id=None, request_path=None, **kwargs): session = kwargs.get("session") db_object_id = kwargs.pop("db_object_id", None) kwargs["db_object_ids"] = [ db_object_id] if db_object_id is not None else [] if not db_object_id: schema = resolve_schema(session, schema_id, service_id) schema_id = schema.get("id") if db_object_name: # Lookup the db_object name rows = lib.core.select(table="db_object", cols="id", where=["name=?", "db_schema_id=?"] ).exec(session, [db_object_name, schema_id]).items for row in rows: kwargs["db_object_ids"].append(row["id"]) elif lib.core.get_interactive_default(): db_objects = lib.db_objects.get_db_objects( session=session, schema_id=schema_id, include_enable_state=None ) caption = ("Please select a db_object index, type " "the request_path or type '*' " "to select all: ") selection = lib.core.prompt_for_list_item( item_list=db_objects, prompt_caption=caption, item_name_property="request_path", given_value=None, print_list=True, allow_multi_select=True) if not selection: raise ValueError("Operation cancelled.") kwargs["db_object_ids"] = [item["id"] for item in selection] return kwargs def generate_create_statement(**kwargs) -> str: lib.core.convert_ids_to_binary(["service_id", "schema_id", "db_object_id"], kwargs) db_object_id = kwargs.get("db_object_id") schema_id = kwargs.get("schema_id") service_id = kwargs.get("service_id") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: db_object = resolve_db_object(session, db_object_id, schema_id, service_id) objects = get_objects( session=session, db_object_id=db_object.get("id") ) if len(objects) == 0: raise Exception( f"The given REST object `{db_object.get("qualified_name")}` does not have a result definition defined." ) return lib.db_objects.get_db_object_create_statement(session, db_object, objects) @plugin_function('mrs.add.dbObject', shell=True, cli=True, web=True) def add_db_object(**kwargs): """Add a db_object to the given MRS schema Args: **kwargs: Additional options Keyword Args: db_object_name (str): The name of the schema object add db_object_type (str): Either TABLE, VIEW or PROCEDURE schema_id (str): The id of the schema the object should be added to schema_name (str): The name of the schema auto_add_schema (bool): If the schema should be added as well if it does not exist yet request_path (str): The request_path enabled (int): Whether the db object is enabled crud_operation_format (str): The format to use for the CRUD operation requires_auth (bool): Whether authentication is required to access the schema items_per_page (int): The number of items returned per page row_user_ownership_enforced (bool): Enable row ownership enforcement row_user_ownership_column (str): The column for row ownership enforcement comments (str): Comments for the schema media_type (str): The media_type of the db object auto_detect_media_type (bool): Whether to automatically detect the media type auth_stored_procedure (str): The stored procedure that implements the authentication check for this db object options (dict): The options of this db object metadata (dict): The metadata of this db object objects (list): The result/parameters objects definition in JSON format session (object): The database session to use. Returns: None """ lib.core.convert_ids_to_binary(["schema_id"], kwargs) if kwargs.get("request_path") is not None: lib.core.Validations.request_path(kwargs.get("request_path")) db_object_name = kwargs.get("db_object_name") db_object_type = kwargs.get("db_object_type") schema_id = kwargs.get("schema_id") schema_name = kwargs.get("schema_name") auto_add_schema = kwargs.get("auto_add_schema", False) request_path = kwargs.get("request_path") enabled = kwargs.get("enabled", True) crud_operation_format = kwargs.get("crud_operation_format") requires_auth = kwargs.get("requires_auth") items_per_page = kwargs.get("items_per_page") row_user_ownership_enforced = kwargs.get( "row_user_ownership_enforced", None) row_user_ownership_column = kwargs.get("row_user_ownership_column", None) comments = kwargs.get("comments") media_type = kwargs.get("media_type") auto_detect_media_type = kwargs.get("auto_detect_media_type", True) auth_stored_procedure = kwargs.get("auth_stored_procedure") options = kwargs.get("options") metadata = kwargs.get("metadata") objects = kwargs.get("objects") session = kwargs.get("session") interactive = lib.core.get_interactive_default() with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session with lib.core.MrsDbTransaction(session): schema = None # If auto_add_schema is set, check if the schema is registered already. # If not, create it if auto_add_schema and schema_name and not schema_id: try: schema = lib.schemas.get_schema( schema_name=schema_name, session=session) schema_id = schema.get("id") except: # If the service does not exist, it should be handled in the scope # of this particular operation. service = resolve_service(session=session, required=False) if not service: raise RuntimeError( "Operation cancelled. The service was not found.") schema_id = lib.schemas.add_schema(schema_name=schema_name, service_id=service["id"], request_path=f"/{schema_name}", requires_auth=True if requires_auth else False, session=session) schema = resolve_schema(session, schema_query=schema_id) if not db_object_type and interactive: # Get object counts per type table_count = lib.database.get_object_type_count( session, schema.get("name"), "TABLE") view_count = lib.database.get_object_type_count( session, schema.get("name"), "VIEW") proc_count = lib.database.get_object_type_count( session, schema.get("name"), "PROCEDURE") db_object_types = [] if table_count > 0: db_object_types.append("TABLE") if view_count > 0: db_object_types.append("VIEW") if proc_count > 0: db_object_types.append("PROCEDURE") if len(db_object_types) == 0: raise ValueError("No database objects in the database schema " f"{schema.get('name')}") caption = ( "Please enter the name or index of a database object type" f"{' [TABLE]: ' if table_count > 0 else ': '}") db_object_type = lib.core.prompt_for_list_item( item_list=db_object_types, prompt_caption=caption, prompt_default_value="TABLE" if table_count > 0 else None, print_list=True) if not db_object_type: raise ValueError('Operation cancelled.') if not db_object_name and interactive: db_objects = lib.database.get_db_objects( session, schema.get("name"), db_object_type) if len(db_objects) == 0: raise ValueError( f"No database objects of type {db_object_type} in the " f"database schema {schema.get('name')}") db_object_name = lib.core.prompt_for_list_item( item_list=[db_object["OBJECT_NAME"] for db_object in db_objects], prompt_caption=("Please enter the name or index of a " "database object: "), print_list=True) if not db_object_name: raise ValueError('Operation cancelled.') # If a db_object name has been provided, check if that db_object exists # in that schema elif db_object_name and db_object_type: db_object = lib.database.get_db_object( session, schema.get("name"), db_object_name, db_object_type) if not db_object: raise ValueError( f"The {db_object_type} named '{db_object_name}' " f"does not exists in database schema {schema.get('name')}.") # Get request_path if not request_path: if interactive: request_path = lib.core.prompt( "Please enter the request path for this object [" f"/{db_object_name}]: ", {'defaultValue': '/' + db_object_name}).strip() else: request_path = '/' + db_object_name if not request_path.startswith('/'): raise Exception("The request_path has to start with '/'.") if not crud_operation_format and interactive: crud_operation_format_options = [ 'FEED', 'ITEM', 'MEDIA'] crud_operation_format = lib.core.prompt_for_list_item( item_list=crud_operation_format_options, prompt_caption=("Please select the CRUD operation format " "[FEED]: "), prompt_default_value="FEED", print_list=True) if not crud_operation_format: raise ValueError("No CRUD operation format specified." "Operation cancelled.") # Get requires_auth if requires_auth is None: if interactive: requires_auth = lib.core.prompt( "Should the db_object require authentication? [y/N]: ", {'defaultValue': 'n'}).strip().lower() == 'y' else: requires_auth = False # Get items_per_page if items_per_page is None: if interactive: items_per_page = lib.core.prompt( "How many items should be listed per page? " "[Schema Default]: ", {'defaultValue': "25"}).strip() if items_per_page: if items_per_page != 'NULL': try: items_per_page = int(items_per_page) except: raise ValueError("No valid value given." "Operation cancelled.") else: items_per_page = None # Get comments if comments is None: if interactive: comments = lib.core.prompt( "Comments: ").strip() else: comments = "" db_object_id, grants = lib.db_objects.add_db_object( session=session, schema_id=schema.get("id"), db_object_name=db_object_name, request_path=request_path, enabled=enabled, db_object_type=db_object_type, items_per_page=items_per_page, requires_auth=requires_auth, row_user_ownership_enforced=row_user_ownership_enforced, row_user_ownership_column=row_user_ownership_column, crud_operation_format=crud_operation_format, comments=comments, media_type=media_type, auto_detect_media_type=auto_detect_media_type, auth_stored_procedure=auth_stored_procedure, options=options, metadata=metadata, objects=objects) for grant in grants: lib.core.MrsDbExec(grant).exec(session) if lib.core.get_interactive_result(): return "\n" + "Object added successfully." return db_object_id @plugin_function('mrs.get.dbObject', shell=True, cli=True, web=True) def get_db_object(request_path=None, db_object_name=None, **kwargs): """Gets a specific MRS db_object Args: request_path (str): The request_path of the schema db_object_name (str): The name of the schema **kwargs: Additional options Keyword Args: db_object_id (str): The id of the db_object schema_id (str): The id of the schema schema_name (str): The name of the schema absolute_request_path (str): The absolute request_path to the db_object session (object): The database session to use. Returns: The db_object as dict """ lib.core.convert_ids_to_binary(["schema_id", "db_object_id"], kwargs) if request_path is not None: lib.core.Validations.request_path(request_path) db_object_id = kwargs.get("db_object_id") schema_id = kwargs.get("schema_id") schema_name = kwargs.get("schema_name") absolute_request_path = kwargs.get("absolute_request_path") interactive = lib.core.get_interactive_default() schema = None with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: if db_object_id: return lib.db_objects.get_db_object(session=session, db_object_id=db_object_id) if absolute_request_path: return lib.db_objects.get_db_object(session=session, absolute_request_path=absolute_request_path) if schema_id: schema = lib.schemas.get_schema( schema_id=schema_id, session=session) elif schema_name: schema = lib.schemas.get_schema( schema_name=schema_name, session=session) elif interactive: rows = lib.database.get_schemas(session) schemas = [row["SCHEMA_NAME"] for row in rows] if schemas is None or len(schemas) == 0: raise ValueError('No database schemas available.') schema_name = lib.core.prompt_for_list_item( item_list=schemas, prompt_caption='Please enter the name or index of a schema: ', print_list=True) schema = lib.schemas.get_schema( schema_name=schema_name, session=session) if not schema: raise ValueError("Unable to find the schema.") if request_path: return lib.db_objects.get_db_object(session=session, schema_id=schema.get("id"), request_path=request_path) if db_object_name: return lib.db_objects.get_db_object(session=session, schema_id=schema.get("id"), db_object_name=db_object_name) if interactive: schema_id = schema.get("id") db_objects = lib.db_objects.get_db_objects( session=session, schema_id=schema.get("id")) print(f"DB Object Listing for Schema " f"{schema.get('host_ctx')}" f"{schema.get('request_path')}") item = lib.core.prompt_for_list_item( item_list=db_objects, prompt_caption=("Please select a db_object index or type " "the request_path: "), item_name_property="request_path", print_list=True) if not item: raise ValueError("Operation cancelled.") if not request_path and not db_object_name: raise ValueError("Unable to search DB object.") return lib.db_objects.get_db_object(session=session, schema_id=schema.get("id"), request_path=request_path) @plugin_function('mrs.list.dbObjects', shell=True, cli=True, web=True) def get_db_objects(**kwargs): """Returns all db_objects for the given schema Args: **kwargs: Additional options Keyword Args: schema_id (str): The id of the schema to list the db_objects from include_enable_state (bool): Only include db_objects with the given enabled state session (object): The database session to use Returns: A list of dicts representing the db_objects of the schema """ lib.core.convert_ids_to_binary(["schema_id"], kwargs) schema_id = kwargs.get("schema_id") include_enable_state = kwargs.get("include_enable_state") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: db_objects = lib.db_objects.get_db_objects(session=session, schema_id=schema_id, include_enable_state=include_enable_state) if lib.core.get_interactive_result(): return lib.db_objects.format_db_object_listing(db_objects, print_header=True) else: return db_objects @plugin_function('mrs.get.dbObjectParameters', shell=True, cli=True, web=True) def get_db_object_parameters(request_path=None, **kwargs): """Gets the list of available parameters given db_object representing a STORED PROCEDURE or FUNCTION Args: request_path (str): The request path of the db_object **kwargs: Additional options Keyword Args: db_object_id (str): The id of the db_object db_schema_name (str): The name of the db_schema db_object_name (str): The name of the db_object db_type (str): The type of the db_object, either PROCEDURE or FUNCTION session (object): The database session to use. Returns: The list of available db object fields """ if request_path is not None: lib.core.Validations.request_path(request_path) db_object_id = kwargs.get("db_object_id") if db_object_id is not None: db_object_id = lib.core.id_to_binary(db_object_id, "db_object_id") db_schema_name = kwargs.get("db_schema_name") db_object_name = kwargs.get("db_object_name") db_type = kwargs.get("db_type", "PROCEDURE") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: if db_object_id: return lib.db_objects.get_db_object_parameters(session, db_object_id) if not db_schema_name: raise Exception("You must supply the schema name.") if not db_object_name: raise Exception("You must supply the DB Object name.") return lib.db_objects.get_db_object_parameters(session, db_schema_name=db_schema_name, db_object_name=db_object_name, db_type=db_type) @plugin_function('mrs.get.dbFunctionReturnType', shell=True, cli=True, web=True) def get_db_function_return_type(db_schema_name, db_object_name, **kwargs): """Gets the return data type of the FUNCTION Args: db_schema_name (str): The name of the db_schema db_object_name (str): The name of the db_object **kwargs: Additional options Keyword Args: session (object): The database session to use. Returns: The datatype as string """ with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: return lib.db_objects.get_db_function_return_type( session, db_schema_name=db_schema_name, db_object_name=db_object_name) @plugin_function('mrs.set.dbObject.requestPath', shell=True, cli=True, web=True) def set_request_path(db_object_id=None, request_path=None, **kwargs): """Sets the request_path of the given db_object Args: db_object_id (str): The id of the schema to list the db_objects from request_path (str): The request_path that should be set **kwargs: Additional options Keyword Args: session (object): The database session to use Returns: None """ if db_object_id is not None: db_object_id = lib.core.id_to_binary(db_object_id, "db_object_id") if request_path is not None: lib.core.Validations.request_path(request_path) interactive = lib.core.get_interactive_default() with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session # Get the object with the given id or let the user select it db_object = lib.db_objects.get_db_object( session=session, db_object_id=db_object_id) if not db_object: return if not request_path and interactive: request_path = lib.core.prompt( "Please enter a new request_path " f"for the db_object {db_object.get('name')} " f"[{db_object.get('request_path')}]: ", {"defaultValue": db_object.get('request_path')}) if request_path == db_object.get('request_path'): if interactive: print("The request_path was left unchanged.") return res = lib.core.update(table="db_object", sets="request_path=?", where="id=?" ).exec(session, [request_path, db_object.get("id")]) if not res.success: raise Exception("Could not update the db_object.") if lib.core.get_interactive_result(): print(f"The db_object {db_object.get('name')} was updated " "successfully.") return True return False @plugin_function('mrs.enable.dbObject', shell=True, cli=True, web=True) def enable_db_object(db_object_name=None, schema_id=None, **kwargs): """Enables a db_object of the given schema Args: db_object_name (str): The name of the db_object schema_id (str): The id of the schema **kwargs: Additional options Keyword Args: db_object_id (str): The id of the db_object session (object): The database session to use. Returns: The result message as string """ lib.core.convert_ids_to_binary(["schema_id", "db_object_id"], kwargs) kwargs["value"] = True with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session kwargs = resolve_db_object_ids(db_object_name, schema_id, **kwargs) with lib.core.MrsDbTransaction(session): lib.db_objects.enable_db_object(**kwargs) if lib.core.get_interactive_result(): if len(kwargs["db_object_ids"]) == 1: return f"The db_object has been enabled." return f"The db_objects have been enabled." return True return False @plugin_function('mrs.disable.dbObject', shell=True, cli=True, web=True) def disable_db_object(db_object_name=None, schema_id=None, **kwargs): """Disables a db_object of the given schema Args: db_object_name (str): The name of the db_object schema_id (str): The id of the schema **kwargs: Additional options Keyword Args: db_object_id (str): The id of the db_object session (object): The database session to use. Returns: The result message as string """ lib.core.convert_ids_to_binary(["db_object_id"], kwargs) if schema_id is not None: schema_id = lib.core.id_to_binary(schema_id, "schema_id") kwargs["value"] = False with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session kwargs = resolve_db_object_ids(db_object_name, schema_id, **kwargs) with lib.core.MrsDbTransaction(session): lib.db_objects.enable_db_object(**kwargs) if lib.core.get_interactive_result(): if len(kwargs["db_object_ids"]) == 1: return f"The db_object has been disabled." return f"The db_objects have been disabled." return True return False @plugin_function('mrs.delete.dbObject', shell=True, cli=True, web=True) def delete_db_object(db_object_name=None, schema_id=None, **kwargs): """Deletes a db_object of the given schema Args: db_object_name (str): The name of the db_object schema_id (str): The id of the schema **kwargs: Additional options Keyword Args: db_object_id (str): The id of the db_object session (object): The database session to use. Returns: True if the object was deleted. """ lib.core.convert_ids_to_binary(["db_object_id"], kwargs) if schema_id is not None: schema_id = lib.core.id_to_binary(schema_id, "schema_id") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session db_object_id = kwargs.get("db_object_id") with lib.core.MrsDbTransaction(session): if db_object_id is not None: lib.db_objects.delete_db_object(session, db_object_id) else: kwargs = resolve_db_object_ids( db_object_name, schema_id, **kwargs) lib.db_objects.delete_db_objects(**kwargs) if lib.core.get_interactive_result(): return f"The db_object has been deleted." return True return False @plugin_function('mrs.update.dbObject', shell=True, cli=True, web=True) def update_db_object(**kwargs): """Update a db_object Args: **kwargs: Additional options Keyword Args: db_object_id (str): The id of the database object db_object_ids (list): A list of database object ids to update db_object_name (str): The name of the database object to update schema_id (str): The id of the schema that contains the database object to be updated request_path (str): The request_path value (dict): The values to update session (object): The database session to use. Allowed options for value: name (str): The new name to apply to the database object db_schema_id (str): The id of the schema to update in the database object enabled (int): If the database object is enabled or not crud_operation_format (str): The format to use for the CRUD operation requires_auth (bool): Whether authentication is required to access the database object items_per_page (int): The number of items returned per page request_path (str): The request_path auto_detect_media_type (bool): Whether the media type should be detected automatically comments (str): Comments for the database object media_type (str): The media_type of the db object auth_stored_procedure (str): The stored procedure that implements the authentication check for this db object options (dict): The options of this db object metadata (dict): The metadata settings of the db object objects (list): The result/parameters objects definition in JSON format Returns: The result message as string """ # convert to python native types or default kwargs["value"] to {} of "values" is not supplied kwargs["value"] = lib.core.convert_json(kwargs.get("value", {})) lib.core.convert_ids_to_binary(["db_schema_id"], kwargs["value"]) if kwargs.get("request_path") is not None: lib.core.Validations.request_path(kwargs["request_path"]) lib.core.convert_ids_to_binary(["schema_id", "db_object_id"], kwargs) schema_name = kwargs["value"].pop("schema_name", None) with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: kwargs["session"] = session if not kwargs.get("db_object_id"): schema = resolve_schema(session, schema_query=kwargs["schema_id"]) kwargs["schema_id"] = schema.get("id") kwargs = resolve_db_object_ids(**kwargs) # verify the objects exist in the schema for object_id in kwargs["db_object_ids"]: db_object = lib.db_objects.get_db_object( session=session, db_object_id=object_id) target_name = kwargs["value"].get("name") or db_object["name"] # get the target schema if kwargs["value"].get("db_schema_id") or schema_name: target_schema = lib.schemas.get_schema(session, schema_id=kwargs["value"].get( "db_schema_id"), schema_name=schema_name) else: target_schema = lib.schemas.get_schema( session, db_object["db_schema_id"]) if not target_schema: raise ValueError("The target schema does not exist.") # check if the target object exists in the target schema if not lib.database.get_db_object(session, target_schema["name"], target_name, db_object["object_type"]): raise ValueError( f"The {db_object.get('object_type')} named '{target_name}' " f"does not exists in database schema '{target_schema['name']}'.") # check if the MRS db_object already exists in the target MRS schema if db_object["db_schema_id"] != target_schema["id"]: if lib.db_objects.get_db_object(session, schema_id=target_schema["id"], db_object_name=target_name): raise ValueError( "The object already exists in the target schema.") kwargs["value"]["db_schema_id"] = target_schema["id"] with lib.core.MrsDbTransaction(session): lib.db_objects.update_db_objects(session=session, db_object_ids=kwargs["db_object_ids"], value=kwargs["value"]) if lib.core.get_interactive_result(): if len(kwargs['db_object_ids']) == 1: return f"The database object has been updated." return f"The database objects have been updated." return True return False @plugin_function('mrs.get.tableColumnsWithReferences', shell=True, cli=True, web=True) def get_table_columns_with_references(db_object_id=None, schema_id=None, request_path=None, db_object_name=None, **kwargs): """Gets the list of table columns and references Args: db_object_id (str): The id of the db_object schema_id (str): The id of the schema request_path (str): The request_path of the schema db_object_name (str): The name of the db_object **kwargs: Additional options Keyword Args: schema_name (str): The name of the schema db_object_type (str): The type of the db_object (TABLE, VIEW, PROCEDURE) session (object): The database session to use. Returns: The list of table columns and references """ if db_object_id is not None: db_object_id = lib.core.id_to_binary(db_object_id, "db_object_id") if schema_id is not None: schema_id = lib.core.id_to_binary(schema_id, "schema_id") if request_path is not None: lib.core.Validations.request_path(request_path) schema_name = kwargs.get("schema_name") db_object_type = kwargs.get("db_object_type") # Guarantee we have upper case type if db_object_type: db_object_type = db_object_type.upper() with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: if db_object_id: return lib.db_objects.get_table_columns_with_references(session, db_object_id) if schema_id: schema = lib.schemas.get_schema(session, schema_id=schema_id) schema_name = schema.get("name") else: if not schema_name: raise Exception("You must supply the schema name.") return lib.db_objects.get_table_columns_with_references(session, schema_name=schema_name, db_object_name=db_object_name, db_object_type=db_object_type) @plugin_function('mrs.get.objects', shell=True, cli=True, web=True) def get_objects(db_object_id=None, **kwargs): """Gets the list of objects for the given db_object Args: db_object_id (str): The id of the db_object **kwargs: Additional options Keyword Args: session (object): The database session to use. Returns: The list of objects of the given db_object """ if not db_object_id: raise Exception("You must supply the db_object_id.") db_object_id = lib.core.id_to_binary(db_object_id, "db_object_id") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: return lib.db_objects.get_objects(session, db_object_id=db_object_id) @plugin_function('mrs.get.objectFieldsWithReferences', shell=True, cli=True, web=True) def get_object_fields_with_references(object_id=None, **kwargs): """Gets the list of object fields and references Args: object_id (str): The id of the db_object.object **kwargs: Additional options Keyword Args: session (object): The database session to use. Returns: The list of object fields and references """ if not object_id: raise Exception("You must supply the object_id.") object_id = lib.core.id_to_binary(object_id, "object_id") with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session: return lib.db_objects.get_object_fields_with_references(session, object_id=object_id) @plugin_function('mrs.get.dbObjectCreateStatement', shell=True, cli=True, web=True) def get_db_object_create_statement(**kwargs): """Returns the corresponding CREATE REST <DB OBJECT> SQL statement of the given MRS service object. Args: **kwargs: Options to determine what should be generated. Keyword Args: service_id (str): The ID of the service where the db_object belongs. schema_id (str): The ID of the schema where the db_object belongs. db_object_id (str): The ID of the db_object to generate. session (object): The database session to use. Returns: The SQL that represents the create statement for the MRS DB OBJECT or False if it fails. """ return generate_create_statement(**kwargs) @plugin_function('mrs.dump.dbObjectCreateStatement', shell=True, cli=True, web=True) def store_db_object_create_statement(**kwargs): """Stores the corresponding CREATE REST <DB OBJECT> SQL statement of the given MRS schema object into a file. Args: **kwargs: Options to determine what should be generated. Keyword Args: service_id (str): The ID of the service where the db_object belongs. schema_id (str): The ID of the schema where the db_object belongs. db_object_id (str): The ID of the db_object to dump. file_path (str): The path where to store the file. overwrite (bool): Overwrite the file, if already exists. session (object): The database session to use. Returns: True if the file was saved. """ file_path = kwargs.get("file_path") overwrite = kwargs.get("overwrite") file_path = resolve_file_path(file_path) resolve_overwrite_file(file_path, overwrite) sql = generate_create_statement(**kwargs) with open(file_path, "w") as f: f.write(sql) if lib.core.get_interactive_result(): return f"File created in {file_path}." return True