mrs_plugin/lib/schemas.py (250 lines of code) (raw):

# Copyright (c) 2021, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA from mrs_plugin.lib import core, database, db_objects def format_schema_listing(schemas, print_header=False): """Formats the listing of schemas Args: schemas (list): A list of schemas as dicts print_header (bool): If set to true, a header is printed Returns: The formatted list of services """ if not schemas: return "No items available." if print_header: output = ( f"{'ID':>3} {'PATH':38} {'SCHEMA NAME':30} {'ENABLED':8} " f"{'AUTH':9}\n" ) else: output = "" for i, item in enumerate(schemas, start=1): url = item["host_ctx"] + item["request_path"] output += ( f"{i:>3} {url[:37]:38} " f"{item['name'][:29]:30} " f"{'Yes' if item['enabled'] else '-':8} " f"{'Yes' if item['requires_auth'] else '-':5}" ) if i < len(schemas): output += "\n" return output def prompt_for_request_path(schema_name) -> str: return core.prompt( "Please enter the request path for this schema [" f"/{schema_name}]: ", {"defaultValue": "/" + schema_name}, ).strip() def prompt_for_requires_auth() -> bool: return ( core.prompt( "Should the schema require authentication? [y/N]: ", {"defaultValue": "n"} ) .strip() .lower() == "y" ) def prompt_for_items_per_page() -> int: while True: result = core.prompt( "How many items should be listed per page? [Schema Default]: ", {"defaultValue": "25"}, ).strip() try: int_result = int(result) return int_result except: print("Required an integer value.") continue def delete_schema(session, schema_id): if not schema_id: raise ValueError("No schema_id given.") result = core.delete(table="db_schema", where=["id=?"]).exec( session, params=[schema_id] ) if not result.success: raise Exception( f"The specified schema with id {core.convert_id_to_string(schema_id)} was not found." ) def delete_schemas(session, schemas: list): if not schemas: raise ValueError("The specified schema was not found.") for schema_id in schemas: delete_schema(session, schema_id) def update_schema(session, schemas: list, value: dict, merge_options=False): if not schemas: raise ValueError("The specified schema was not found.") # Update all given services for schema_id, host_ctx in schemas.items(): schema = get_schema(session=session, schema_id=schema_id) if not schema: raise Exception( f"The specified schema with id {core.convert_id_to_string(schema_id)} was not " "found." ) # metadata column was only added in 3.0.0 current_version = core.get_mrs_schema_version(session) if current_version[0] <= 2: value.pop("metadata", None) # Prepare the merge of options, if requested if merge_options: options = value.get("options", None) # Check if there are options set already, if so, merge the options if options is not None: row = core.MrsDbExec(""" SELECT options IS NULL AS options_is_null FROM `mysql_rest_service_metadata`.`db_schema` WHERE id = ?""", [schema_id]).exec(session).first if row and row["options_is_null"] == 1: merge_options = False else: value.pop("options") if value: core.update(table="db_schema", sets=value, where=["id=?"]).exec( session, [schema_id] ) # Merge options if requested if merge_options and options is not None: core.MrsDbExec(""" UPDATE `mysql_rest_service_metadata`.`db_schema` SET options = JSON_MERGE_PATCH(options, ?) WHERE id = ? """, [options, schema_id]).exec(session) def query_schemas( session, schema_id=None, service_id=None, schema_name=None, request_path=None, include_enable_state=None, auto_select_single=False, ): if not session: raise ValueError("The session is invalid.") if schema_id is None and auto_select_single: record = ( core.select( table="db_schema", cols=["count(*) as service_count", "min(id)"] ) .exec(session) .first ) if record["service_count"] == 1: schema_id = record["id"] if request_path and not request_path.startswith("/"): raise Exception("The request_path has to start with '/'.") # Build SQL based on which input has been provided # metadata column was only added in 3.0.0 current_version = core.get_mrs_schema_version(session) if current_version[0] <= 2: sql = """ SELECT sc.id, sc.name, sc.service_id, sc.request_path, sc.requires_auth, sc.enabled, sc.items_per_page, sc.comments, se.url_host_id, CONCAT(h.name, se.url_context_root) AS host_ctx, sc.options FROM `mysql_rest_service_metadata`.db_schema sc LEFT OUTER JOIN `mysql_rest_service_metadata`.service se ON se.id = sc.service_id LEFT JOIN `mysql_rest_service_metadata`.url_host h ON se.url_host_id = h.id """ else: sql = """ SELECT sc.id, sc.name, sc.service_id, sc.request_path, sc.requires_auth, sc.enabled, sc.items_per_page, sc.comments, se.url_host_id, CONCAT(h.name, se.url_context_root) AS host_ctx, sc.options, sc.metadata, sc.schema_type FROM `mysql_rest_service_metadata`.db_schema sc LEFT OUTER JOIN `mysql_rest_service_metadata`.service se ON se.id = sc.service_id LEFT JOIN `mysql_rest_service_metadata`.url_host h ON se.url_host_id = h.id """ params = [] wheres = [] if schema_id: wheres.append("sc.id = ?") params.append(schema_id) else: if service_id: wheres.append("sc.service_id = ?") params.append(service_id) if request_path: wheres.append("sc.request_path = ?") params.append(request_path) if schema_name: wheres.append("sc.name = ?") params.append(schema_name) if include_enable_state is not None: wheres.append("sc.enabled = ?") params.append(True if include_enable_state else False) sql += core._generate_where(wheres) sql += " ORDER BY sc.request_path" return core.MrsDbExec(sql, params).exec(session).items def get_schemas(session, service_id: bytes = None, include_enable_state=None): """Returns all schemas for the given MRS service Args: session (object): The database session to use. service_id: The id of the service to list the schemas from include_enable_state (bool): Only include schemas with the given enabled state Returns: List of dicts representing the schemas """ return query_schemas( session, service_id=service_id, include_enable_state=include_enable_state ) def get_schema( session, schema_id: bytes = None, service_id: bytes = None, schema_name=None, request_path=None, auto_select_single=False, ): """Gets a specific MRS schema Args: session (object): The database session to use. request_path (str): The request_path of the schema schema_name (str): The name of the schema schema_id: The id of the schema service_id: The id of the service Returns: The schema as dict or None on error in interactive mode """ result = query_schemas( session, schema_id=schema_id, service_id=service_id, schema_name=schema_name, request_path=request_path, auto_select_single=False, ) return result[0] if result else None def add_schema( session, schema_name, service_id: bytes = None, request_path=None, requires_auth=None, enabled=1, items_per_page=None, comments=None, options=None, metadata=None, schema_type="DATABASE_SCHEMA", internal=False, schema_id=None, ): """Add a schema to the given MRS service Args: schema_name (str): The name of the schema to add service_id: The id of the service the schema should be added to request_path (str): The request_path requires_auth (bool): Whether authentication is required to access the schema enabled (int): The enabled state items_per_page (int): The number of items returned per page comments (str): Comments for the schema options (dict): The options for the schema metadata (dict): Metadata of the schema schema_type (str): Either "DATABASE_SCHEMA" or "SCRIPT_MODULE" session (object): The database session to use. Returns: The id of the inserted schema """ if schema_type == "DATABASE_SCHEMA": # If a schema name has been provided, check if that schema exists row = database.get_schema(session, schema_name) if row is None: raise ValueError( f"The given database schema name '{schema_name}' does not exists." ) schema_name = row["SCHEMA_NAME"] elif schema_name is None: raise ValueError(f"No schema name given.") # Get request_path and default it to '/' if request_path is None: request_path = "/" + schema_name core.Validations.request_path(request_path) # Get requires_auth if requires_auth is None: requires_auth = False # Get items_per_page if items_per_page is None: items_per_page = 25 # Get comments if comments is None: comments = "" if options is None: options = "" if schema_id is None: schema_id = core.get_sequence_id(session) values = { "id": schema_id, "service_id": service_id, "name": schema_name, "request_path": request_path, "requires_auth": int(requires_auth), "enabled": enabled, "items_per_page": items_per_page, "comments": comments, "options": core.convert_json(options) if options else None, "metadata": core.convert_json(metadata) if metadata else None, "schema_type": schema_type, "internal": int(internal), } # metadata column was only added in 3.0.0 current_version = core.get_mrs_schema_version(session) if current_version[0] <= 2: values.pop("metadata", None) values.pop("schema_type", None) values.pop("internal", None) core.insert(table="db_schema", values=values).exec(session) return schema_id def get_current_schema(session): """Returns the current schema Args: session (object): The database session to use. Returns: The current or default service or None if no default is set """ # Get current_service_id from the global mrs_config mrs_config = core.get_current_config() current_schema_id = mrs_config.get("current_schema_id") current_schema = None if current_schema_id: current_schema = get_schema(session, schema_id=current_schema_id) return current_schema def get_schema_create_statement(session, schema, include_database_endpoints: bool = False) -> str: output = [ f'CREATE OR REPLACE REST SCHEMA {schema.get("request_path")} ON SERVICE {schema.get("host_ctx")}', f' FROM `{schema.get("name")}`' ] if schema.get("enabled") == 2: output.append(" PRIVATE") elif schema.get("enabled") != 1: output.append(" DISABLED") if schema.get("requires_auth") == 1: output.append(" AUTHENTICATION REQUIRED") else: output.append(" AUTHENTICATION NOT REQUIRED") if schema.get("options"): output.append(core.format_json_entry("OPTIONS", schema.get("options"))) if schema.get("metadata"): output.append(core.format_json_entry("METADATA", schema.get("metadata"))) result = ["\n".join(output) + ";"] if include_database_endpoints: schema_db_objects = db_objects.get_db_objects(session, schema["id"]) for schema_db_object in schema_db_objects: objects = db_objects.get_objects(session, schema_db_object.get("id")) result.append(db_objects.get_db_object_create_statement(session, schema_db_object, objects)) return "\n\n".join(result)