mrs_plugin/interactive.py (252 lines of code) (raw):
# Copyright (c) 2022, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import mrs_plugin.lib as lib
import json
import os
from pathlib import Path
def service_query_selection(**kwargs):
service = kwargs.get("service")
service_id = kwargs.get("service_id")
url_host_name = kwargs.get("url_host_name")
url_context_root = kwargs.get("url_context_root")
if service_id is not None:
return service_id
if service is not None:
return service
if url_context_root is not None:
return f"{url_context_root}"
return None
def schema_query_selection(**kwargs):
schema = kwargs.get("schema")
schema_id = kwargs.get("schema_id")
if schema_id is not None:
return schema_id
if schema is not None:
return schema
return None
def auth_app_query_selection(**kwargs):
auth_app = kwargs.get("auth_app")
auth_app_id = kwargs.get("auth_app_id")
if auth_app_id is not None:
return auth_app_id
if auth_app is not None:
return auth_app
return None
def resolve_service(session, service_query:str | bytes=None, required:bool=True, auto_select_single:bool=False):
service = None
if service_query:
if isinstance(service_query, bytes):
# Check if given service exists by searching its id
service = lib.services.get_service(
service_id=service_query, session=session)
else:
# Check if the service exists by host and context root
url_host_name, url_context_root = service_query.split("/", 1)
url_context_root = f"/{url_context_root}"
service = lib.services.get_service(
url_host_name=url_host_name, url_context_root=url_context_root, session=session)
if not service:
service = lib.services.get_current_service(session)
services = lib.services.get_services(session)
if len(services) == 1 and auto_select_single:
# If there only is one service and auto_select_single is True, take the single service
service = services[0]
if not service and lib.core.get_interactive_default():
print("MRS - Service Listing\n")
service = lib.core.prompt_for_list_item(
item_list=services,
prompt_caption=("Please select a service index or type "
"'hostname/root_context'"),
item_name_property="host_ctx",
given_value=None,
print_list=True)
if not service and required:
raise Exception("Operation cancelled. Unable to identify target service.")
return service
def resolve_schema(session, schema_query:str | bytes=None, service_query:str | bytes=None, required:bool=True):
schema = None
service = None
if schema_query is not None:
if isinstance(schema_query, bytes):
schema = lib.schemas.get_schema(session=session, schema_id=schema_query)
elif isinstance(schema_query, str):
url_host_name, url_context_root, request_path = schema_query.split("/")
service = lib.services.get_service(session, url_host_name=url_host_name, url_context_root=f"/{url_context_root}")
if service is not None:
schema = lib.schemas.get_schema(session, service_id=service["id"], request_path=f"/{request_path}")
if schema:
return schema
if lib.core.get_interactive_default():
service = resolve_service(session, service_query)
schemas = lib.schemas.get_schemas(session, service["id"])
schema = lib.core.prompt_for_list_item(
item_list=schemas,
prompt_caption='Please enter the name or index of a schema: ',
item_name_property="name",
print_list=True)
if not schema and required:
raise Exception("Cancelling operation. Could not determine the schema.")
return schema
def resolve_db_object(session, db_object_id:bytes=None, schema_id:bytes=None, service_id:bytes=None, required:bool=True):
db_object = None
if db_object_id:
db_object = lib.db_objects.get_db_object(session, db_object_id)
if db_object:
return db_object
if lib.core.get_interactive_default():
schema = resolve_schema(session, schema_query=schema_id, service_query=service_id)
db_objects = lib.db_objects.get_db_objects(session, schema["id"])
db_object = lib.core.prompt_for_list_item(
item_list=db_objects,
prompt_caption='Please enter the name or index of a db_object: ',
item_name_property="name",
print_list=True)
if not db_object and required:
raise Exception("Cancelling operation. Could not determine the db_object.")
return db_object
def resolve_content_set(session, content_set_id:bytes=None, service_id:bytes=None, required:bool=True):
content_set = None
if content_set_id:
content_set = lib.content_sets.get_content_set(session, content_set_id=content_set_id)
if content_set:
return content_set
if lib.core.get_interactive_default():
if not service_id:
service = resolve_service(session, service_id)
content_sets = lib.content_sets.get_content_sets(session, service["id"])
content_set = lib.core.prompt_for_list_item(
item_list=content_sets,
prompt_caption='Please enter the name or index of a content set: ',
item_name_property="request_path",
print_list=True)
if not content_set and required:
raise Exception("Cancelling operation. Could not determine the content set.")
return content_set
def resolve_content_file(session, content_file_id:bytes=None, content_set_id:bytes=None, service_id:bytes=None, required:bool=True):
content_file = None
if content_file_id:
content_file = lib.content_files.get_content_file(session, content_file_id)
if content_file:
return content_file
if lib.core.get_interactive_default():
content_set = resolve_content_set(session, content_set_id, service_id)
content_files = lib.content_files.get_content_files(session,
service_id=content_set["id"])
content_file = lib.core.prompt_for_list_item(
item_list=content_files,
prompt_caption='Please enter the name or index of a content file: ',
item_name_property="request_path",
print_list=True)
if not content_file and required:
raise Exception("Cancelling operation. Could not determine the content file.")
return content_file
def user_query_selection(**kwargs):
user_id = kwargs.get("user_id")
user = kwargs.get("user")
if user_id is not None:
return user_id
if user is not None:
return user
return None
def resolve_user(session, user_query:str | bytes):
if isinstance(user_query, bytes):
return lib.users.get_user(session, user_id=user_query)
if isinstance(user_query, str):
url_host_name, url_context_root, auth_app_name, user_name = user_query.split("/")
service = lib.services.get_service(session, url_host_name=url_host_name, url_context_root=f"/{url_context_root}")
if not service:
raise Exception("service not found")
auth_app = lib.auth_apps.get_auth_app(session, name=auth_app_name)
if not auth_app:
raise Exception("auth_app not found")
user = lib.users.get_user(session, service_id=service["id"], auth_app_id=auth_app["id"], user_name=user_name)
if not user:
raise Exception("user not found")
return user
return None
def role_query_selection(**kwargs):
role_id = kwargs.get("role_id")
role_name = kwargs.get("role_id")
role_query = kwargs.get("role")
if role_id is not None:
return role_id
if isinstance(role_query, bytes) or isinstance(role_query, str):
return role_query
if role_name is not None:
return role_name
return None
def resolve_role(session, role_query:str | bytes):
if isinstance(role_query, bytes):
return lib.roles.get_role(session, role_id=role_query)
role_query_list = role_query.split("/")
service_id = None
role_name = None
if len(role_query_list) == 3:
url_host_name = role_query_list[0]
url_context_root = role_query_list[1]
role_name = role_query_list[2]
service = lib.services.get_service(session, url_host_name=url_host_name, url_context_root=url_context_root)
service_id = service["id"]
elif len(role_query_list) == 1:
role_name = role_query_list[0]
roles = lib.roles.get_roles(session, service_id)
for role in roles:
if role["caption"] == role_name:
return role
return None
def resolve_options(options, default = None):
# it should be possible to override the default value with an empty dict
if options is not None:
return options
if lib.core.get_interactive_default():
entries = [
"Default Service Options for Development",
"No options",
"Custom options"
]
selection = lib.core.prompt_for_list_item(
item_list=entries,
prompt_caption=("Please select how to initialize the options [Default Service Options for Development]"),
prompt_default_value=entries[0],
given_value=None,
print_list=True)
if selection == "Default Service Options for Development":
return default
elif selection == "No options":
return None
elif selection == "Custom options":
return json.loads(lib.core.prompt("Options (in JSON format): "))
return None
def resolve_auth_app(session, auth_app_query:str | bytes=None, service_query:str | bytes=None, required:bool=True):
if isinstance(auth_app_query, bytes):
return lib.auth_apps.get_auth_app(session, auth_app_query)
service = resolve_service(session, service_query=service_query)
auth_app_list = lib.auth_apps.get_auth_apps(session, service["id"])
if len(auth_app_list) == 0:
raise Exception("No Authentication Apps found for this service.")
if len(auth_app_list) == 1:
return auth_app_list[0]
selection = lib.core.prompt_for_list_item(
item_list=auth_app_list,
prompt_caption='Please enter the name or index of an auth_app: ',
item_name_property="name",
print_list=True)
if not selection and required:
raise Exception("Cancelling operation. Could not determine the auth_app.")
return selection
def resolve_file_path(file_path:str=None, required:bool=True):
if file_path is None and lib.core.get_interactive_default():
file_path = lib.core.prompt("Please set the file path", {
"defaultValue": None,
})
if not file_path and required:
raise Exception("Cancelling operation. Could not determine the file path.")
if file_path.startswith("~"):
file_path = os.path.expanduser(file_path)
if not os.path.isabs(file_path):
return Path(Path.home() / file_path).as_posix()
return Path(file_path).as_posix()
def resolve_overwrite_file(file_path:str, overwrite:bool) -> None:
if not os.path.exists(file_path):
return
if os.path.isdir(file_path):
raise Exception("Cancelling operation. Path already exists and it's a directory.")
if overwrite is None and lib.core.get_interactive_default():
overwrite = lib.core.prompt(f"Overwrite {file_path}? [y/N]: ",
{'defaultValue': 'n'}).strip().lower() == "y"
if overwrite is False:
raise Exception(f"Cancelling operation. File '{file_path}' already exists.")