mrs_plugin/schemas.py (297 lines of code) (raw):
# Copyright (c) 2021, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module for managing MRS schemas"""
# cSpell:ignore mysqlsh, mrs
from mysqlsh.plugin_manager import plugin_function
import mrs_plugin.lib as lib
from .interactive import resolve_service, resolve_schema, \
resolve_file_path, resolve_overwrite_file, schema_query_selection, service_query_selection
def verify_value_keys(**kwargs):
for key in kwargs["value"].keys():
if key not in ["name", "request_path", "requires_auth",
"enabled", "items_per_page", "comments",
"service_id", "options", "metadata"] and key != "delete":
raise Exception(f"Attempting to change an invalid schema value.")
def resolve_schemas(**kwargs):
session = kwargs.get("session")
service_id = kwargs.pop("service_id", None)
schema_id = kwargs.pop("schema_id", None)
schema_name = kwargs.pop("schema_name", None)
request_path = kwargs.pop("request_path", None)
allow_multi_select = kwargs.pop("allow_multi_select", True)
kwargs["schemas"] = {}
# if there's g proper schema_id, then use it
if schema_id is not None:
schema = lib.schemas.get_schema(session=session, schema_id=schema_id)
kwargs["schemas"][schema_id] = schema.get("host_ctx")
return kwargs
# Check if given service_id exists or use the current service
service = lib.services.get_service(
service_id=service_id, session=session)
if not service:
raise Exception("Unable to get a service.")
service_id = service.get("id")
rows = lib.core.select(table="db_schema",
cols=["id"],
where="service_id=?"
).exec(session, [service_id]).items
if not rows:
raise ValueError("No schemas available. Use mrs.add.schema() "
"to add a schema.")
if len(rows) == 1:
schema = lib.schemas.get_schema(
session=session, schema_id=rows[0]["id"])
kwargs["schemas"][rows[0]["id"]] = schema.get("host_ctx")
return kwargs
if schema_name is not None:
# Lookup the schema name
row = lib.core.select(table="db_schema",
cols="id",
where=["name=?", "service_id=?"]
).exec(session, [schema_name, service.get("id")]).first
schema = lib.schemas.get_schema(
session=session, schema_id=rows[0]["id"])
kwargs["schemas"][rows[0]["id"]] = schema.get("host_ctx")
return kwargs
if request_path is not None:
# Lookup the request path
row = lib.core.select(table="db_schema",
cols="id",
where=["request_path=?", "service_id=?"]
).exec(session, [request_path, service.get("id")]).first
schema = lib.schemas.get_schema(session=session, schema_id=row["id"])
kwargs["schemas"][row["id"]] = schema.get("host_ctx")
return kwargs
schema = lib.schemas.get_current_schema(session=session)
if schema is not None:
kwargs["schemas"][schema.get("id")] = schema.get("host_ctx")
return kwargs
if not lib.core.get_interactive_default():
raise ValueError("Operation cancelled.")
# this is the code for interactive mode
schemas = lib.schemas.get_schemas(
service_id=service.get("id"),
include_enable_state=None,
session=session)
caption = ("Please select a schema index, type "
"the request_path or type '*' "
"to select all: ")
selection = lib.core.prompt_for_list_item(
item_list=schemas,
prompt_caption=caption,
item_name_property="request_path",
given_value=None,
print_list=True,
allow_multi_select=allow_multi_select)
if not selection:
raise ValueError("Operation cancelled.")
for current_schema in selection:
schema = lib.schemas.get_schema(request_path=request_path, schema_id=current_schema.get("id"),
session=session)
kwargs["schemas"][current_schema.get("id")] = schema.get("host_ctx")
return kwargs
def resolve_requires_auth(**kwargs):
value = kwargs.get("value")
if not lib.core.get_interactive_default():
return kwargs
if value is None or "requires_auth" in value and value["requires_auth"] is None:
kwargs["value"]["requires_auth"] = lib.core.prompt_for_requires_auth()
return kwargs
def resolve_items_per_page(**kwargs):
value = kwargs.get("value")
if not lib.core.get_interactive_default():
return kwargs
if value is None or "items_per_page" in value and value["items_per_page"] is None:
kwargs["value"]["items_per_page"] = lib.core.prompt_for_items_per_page()
return kwargs
def resolve_comments(**kwargs):
value = kwargs.get("value")
if not lib.core.get_interactive_default():
return kwargs
if value is None or "comments" in value and value["comments"] is None:
kwargs["value"]["comments"] = lib.core.core.prompt_for_comments()
return kwargs
def call_update_schema(**kwargs):
text_update = kwargs.pop("text_update", "updated")
lib_func = kwargs.pop("lib_function", lib.schemas.update_schema)
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
kwargs["session"] = session
if not kwargs.get("schema_id"):
service = resolve_service(session, kwargs.get("service_id"))
kwargs["service_id"] = service.get("id")
kwargs = resolve_schemas(**kwargs)
with lib.core.MrsDbTransaction(session):
lib_func(**kwargs)
if lib.core.get_interactive_result():
if len(kwargs['schemas']) == 1:
return f"The schema has been {text_update}."
return f"The schemas have been {text_update}."
return True
return False
def generate_create_statement(**kwargs) -> str:
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
lib.core.try_convert_ids_to_binary(["service", "schema"], kwargs)
include_database_endpoints = kwargs.get("include_database_endpoints", False)
service_query = service_query_selection(**kwargs)
schema_query = schema_query_selection(**kwargs)
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
schema = resolve_schema(session, schema_query=schema_query, service_query=service_query)
return lib.schemas.get_schema_create_statement(session, schema, include_database_endpoints=include_database_endpoints)
@plugin_function('mrs.add.schema', shell=True, cli=True, web=True)
def add_schema(**kwargs):
"""Add a schema to the given MRS service
Args:
**kwargs: Additional options
Keyword Args:
service_id (str): The id of the service the schema should be added to
schema_name (str): The name of the schema to add
request_path (str): The request_path
requires_auth (bool): Whether authentication is required to access
the schema
enabled (int): The enabled state
items_per_page (int): The number of items returned per page
comments (str): Comments for the schema
options (dict): The options for the schema
metadata (dict): The metadata settings of the schema
schema_type (str): Either 'DATABASE_SCHEMA' or 'SCRIPT_MODULE'
internal (bool): Whether the schema is for internal usage
session (object): The database session to use.
Returns:
The schema_id of the created schema when not in interactive mode
"""
lib.core.convert_ids_to_binary(["service_id"], kwargs)
schema_name = kwargs.get("schema_name")
request_path = kwargs.get("request_path")
requires_auth = kwargs.get("requires_auth")
enabled = kwargs.get("enabled", 1)
items_per_page = kwargs.get("items_per_page")
comments = kwargs.get("comments")
options = kwargs.get("options")
metadata = kwargs.get("metadata")
schema_type = kwargs.get("schema_type")
internal = kwargs.get("internal", False)
interactive = lib.core.get_interactive_default()
if schema_type is None:
schema_type = "DATABASE_SCHEMA"
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
service = resolve_service(session, kwargs.get("service_id"))
if not service:
raise RuntimeError(
"Operation cancelled. The service was not found.")
if not schema_name and interactive:
rows = lib.database.get_schemas(session)
schemas = [row["SCHEMA_NAME"] for row in rows]
if schemas is None or len(schemas) == 0:
raise ValueError('No database schemas available.')
schema_name = lib.core.prompt_for_list_item(
item_list=schemas,
prompt_caption='Please enter the name or index of a schema: ',
print_list=True)
if not schema_name:
raise ValueError('Operation cancelled.')
# Get request_path
if not request_path:
if interactive:
request_path = lib.schemas.prompt_for_request_path(schema_name)
else:
request_path = f"/{schema_name}"
# Get requires_auth
if requires_auth is None and interactive:
requires_auth = lib.schemas.prompt_for_requires_auth()
# Get items_per_page
if items_per_page is None and interactive:
items_per_page = lib.schemas.prompt_for_items_per_page()
# Get comments
if comments is None and interactive:
comments = lib.core.prompt(
"Comments: ").strip()
if options is None and interactive:
options = lib.core.prompt("Options: ").strip()
with lib.core.MrsDbTransaction(session):
id = lib.schemas.add_schema(schema_name=schema_name, service_id=service["id"],
request_path=request_path, requires_auth=requires_auth, enabled=enabled,
items_per_page=items_per_page, comments=comments, options=options,
metadata=metadata, schema_type=schema_type, internal=internal,
session=session)
schema = lib.schemas.get_schema(session=session, schema_id=id)
if lib.core.get_interactive_result():
return f"\nService with path {schema['request_path']} created successfully."
return id
@plugin_function('mrs.get.schema', shell=True, cli=True, web=True)
def get_schema(**kwargs):
"""Gets a specific MRS schema
Args:
**kwargs: Additional options
Keyword Args:
service_id (str): The id of the service
request_path (str): The request_path of the schema
schema_name (str): The name of the schema
schema_id (str): The id of the schema
auto_select_single (bool): If there is a single service only, use that
session (object): The database session to use.
Returns:
The schema as dict or None on error in interactive mode
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
if kwargs.get("request_path") is not None:
lib.core.Validations.request_path(kwargs["request_path"])
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
kwargs["session"] = session
kwargs["allow_multi_select"] = False
service = resolve_service(session, kwargs.get("service_id"))
if not service:
raise RuntimeError(
"Operation cancelled. The service was not found.")
kwargs["service_id"] = service["id"]
kwargs = resolve_schemas(**kwargs)
schema_id = list(kwargs["schemas"].keys())[0]
schema = lib.schemas.get_schema(session, schema_id=schema_id)
if lib.core.get_interactive_result():
return lib.schemas.format_schema_listing([schema], True)
return schema
@plugin_function('mrs.list.schemas', shell=True, cli=True, web=True)
def get_schemas(service_id=None, **kwargs):
"""Returns all schemas for the given MRS service
Args:
service_id (str): The id of the service to list the schemas from
**kwargs: Additional options
Keyword Args:
include_enable_state (bool): Only include schemas with the given
enabled state
session (object): The database session to use.
Returns:
Either a string listing the schemas when interactive is set or list
of dicts representing the schemas
"""
if service_id is not None:
service_id = lib.core.id_to_binary(service_id, "service_id")
include_enable_state = kwargs.get("include_enable_state")
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
service = resolve_service(session, service_id, False)
if service:
service_id = service["id"]
schemas = lib.schemas.get_schemas(service_id=service_id,
include_enable_state=include_enable_state,
session=session)
if lib.core.get_interactive_result():
return lib.schemas.format_schema_listing(schemas=schemas, print_header=True)
return schemas
@plugin_function('mrs.enable.schema', shell=True, cli=True, web=True)
def enable_schema(**kwargs):
"""Enables a schema of the given service
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"enabled": True}
return call_update_schema(text_update="enabled", **kwargs)
@plugin_function('mrs.disable.schema', shell=True, cli=True, web=True)
def disable_schema(**kwargs):
"""Disables a schema of the given service
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"enabled": False}
return call_update_schema(text_update="disabled", **kwargs)
@plugin_function('mrs.delete.schema', shell=True, cli=True, web=True)
def delete_schema(**kwargs):
"""Deletes a schema of the given service
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
schema_id = kwargs.get("schema_id")
if schema_id:
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
return lib.schemas.delete_schema(schema_id=schema_id, session=session)
# TODO: the result message is not properly returned in this case
else:
return call_update_schema(text_update="deleted", lib_function=lib.schemas.delete_schemas, **kwargs)
@plugin_function('mrs.set.schema.name', shell=True, cli=True, web=True)
def set_name(**kwargs):
"""Sets the name of a given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
value (str): The value
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"name": kwargs.get("value")}
return call_update_schema(**kwargs)
@plugin_function('mrs.set.schema.requestPath', shell=True, cli=True, web=True)
def set_request_path(**kwargs):
"""Sets the request_path of a given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
value (str): The value
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"request_path": kwargs.get("value")}
return call_update_schema(**kwargs)
@plugin_function('mrs.set.schema.requiresAuth', shell=True, cli=True, web=True)
def set_require_auth(**kwargs):
"""Sets the requires_auth flag of the given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
value (bool): The value
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"requires_auth": kwargs.get("value", True)}
kwargs = resolve_requires_auth(**kwargs)
return call_update_schema(**kwargs)
@plugin_function('mrs.set.schema.itemsPerPage', shell=True, cli=True, web=True)
def set_items_per_page(**kwargs):
"""Sets the items_per_page of a given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
value (int): The value
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"items_per_page": kwargs.get("value", 25)}
kwargs = resolve_items_per_page(**kwargs)
return call_update_schema(**kwargs)
@plugin_function('mrs.set.schema.comments', shell=True, cli=True, web=True)
def set_comments(**kwargs):
"""Sets the comments of a given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): The name of the schema
value (str): The value
session (object): The database session to use.
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = {"comments": kwargs.get("value")}
kwargs = resolve_comments(**kwargs)
return call_update_schema(**kwargs)
@plugin_function('mrs.update.schema', shell=True, cli=True, web=True)
def update_schema(**kwargs):
"""Updates the given schema
Args:
**kwargs: Additional options
Keyword Args:
schema_id (str): The id of the schema
service_id (str): The id of the service
schema_name (str): (required) The name of the schema
value (dict): The values as dict #TODO: check why dicts cannot be passed
session (object): The database session to use.
Allowed options for value:
service_id (str): The new service id for the schema
schema_name (str): The name of the schema
request_path (str): The request_path
requires_auth (bool): Whether authentication is required to access
the schema
enabled (int): The enabled state
items_per_page (int): The number of items returned per page
comments (str): Comments for the schema
options (dict): The options for the schema
metadata (dict): The metadata settings of the schema
Returns:
The result message as string
"""
lib.core.convert_ids_to_binary(["service_id", "schema_id"], kwargs)
kwargs["value"] = lib.core.convert_json(kwargs["value"])
lib.core.convert_ids_to_binary(["service_id"], kwargs["value"])
if kwargs.get("request_path") is not None:
lib.core.Validations.request_path(kwargs["request_path"])
if "schema_name" in kwargs["value"]:
kwargs["value"]["name"] = kwargs["value"]["schema_name"]
del kwargs["value"]["schema_name"]
verify_value_keys(**kwargs)
kwargs = resolve_requires_auth(**kwargs)
kwargs = resolve_items_per_page(**kwargs)
kwargs = resolve_comments(**kwargs)
return call_update_schema(**kwargs)
@plugin_function('mrs.get.schemaCreateStatement', shell=True, cli=True, web=True)
def get_schema_create_statement(**kwargs):
"""Returns the corresponding CREATE REST SCHEMA SQL statement of the given MRS schema object.
When using the 'schema' parameter, you can choose either of these formats:
- '0x11EF8496143CFDEC969C7413EA499D96' - Hexadecimal string ID
- 'Ee+ElhQ8/eyWnHQT6kmdlg==' - Base64 string ID
- 'localhost/myService/mySchema' - Human readable string ID
Args:
**kwargs: Options to determine what should be generated.
Keyword Args:
service_id (str): The ID of the service where the schema belongs.
schema_id (str): The ID of the schema to generate.
schema (str): The identifier of the schema.
include_database_endpoints (bool): Include database objects that belong to the schema.
session (object): The database session to use.
Returns:
The SQL that represents the create statement for the MRS schema
"""
return generate_create_statement(**kwargs)
@plugin_function('mrs.dump.schemaCreateStatement', shell=True, cli=True, web=True)
def store_schema_create_statement(**kwargs):
"""Stores the corresponding CREATE REST schema SQL statement of the given MRS schema
object into a file.
When using the 'schema' parameter, you can choose either of these formats:
- '0x11EF8496143CFDEC969C7413EA499D96' - Hexadecimal string ID
- 'Ee+ElhQ8/eyWnHQT6kmdlg==' - Base64 string ID
- 'localhost/myService/mySchema' - Human readable string ID
Args:
**kwargs: Options to determine what should be generated.
Keyword Args:
service_id (str): The ID of the service where the schema belongs.
schema_id (str): The ID of the schema to dump.
schema (str): The identifier of the schema.
file_path (str): The path where to store the file.
overwrite (bool): Overwrite the file, if already exists.
include_database_endpoints (bool): Include database objects that belong to the schema.
session (object): The database session to use.
Returns:
True if the file was saved.
"""
file_path = kwargs.get("file_path")
overwrite = kwargs.get("overwrite")
file_path = resolve_file_path(file_path)
resolve_overwrite_file(file_path, overwrite)
sql = generate_create_statement(**kwargs)
with open(file_path, "w") as f:
f.write(sql)
if lib.core.get_interactive_result():
return f"File created in {file_path}."
return True