in mrs_plugin/db_objects.py [0:0]
def add_db_object(**kwargs):
"""Add a db_object to the given MRS schema
Args:
**kwargs: Additional options
Keyword Args:
db_object_name (str): The name of the schema object add
db_object_type (str): Either TABLE, VIEW or PROCEDURE
schema_id (str): The id of the schema the object should be added to
schema_name (str): The name of the schema
auto_add_schema (bool): If the schema should be added as well if it
does not exist yet
request_path (str): The request_path
enabled (int): Whether the db object is enabled
crud_operation_format (str): The format to use for the CRUD operation
requires_auth (bool): Whether authentication is required to access
the schema
items_per_page (int): The number of items returned per page
row_user_ownership_enforced (bool): Enable row ownership enforcement
row_user_ownership_column (str): The column for row ownership enforcement
comments (str): Comments for the schema
media_type (str): The media_type of the db object
auto_detect_media_type (bool): Whether to automatically detect the media type
auth_stored_procedure (str): The stored procedure that implements the authentication check for this db object
options (dict): The options of this db object
metadata (dict): The metadata of this db object
objects (list): The result/parameters objects definition in JSON format
session (object): The database session to use.
Returns:
None
"""
lib.core.convert_ids_to_binary(["schema_id"], kwargs)
if kwargs.get("request_path") is not None:
lib.core.Validations.request_path(kwargs.get("request_path"))
db_object_name = kwargs.get("db_object_name")
db_object_type = kwargs.get("db_object_type")
schema_id = kwargs.get("schema_id")
schema_name = kwargs.get("schema_name")
auto_add_schema = kwargs.get("auto_add_schema", False)
request_path = kwargs.get("request_path")
enabled = kwargs.get("enabled", True)
crud_operation_format = kwargs.get("crud_operation_format")
requires_auth = kwargs.get("requires_auth")
items_per_page = kwargs.get("items_per_page")
row_user_ownership_enforced = kwargs.get(
"row_user_ownership_enforced", None)
row_user_ownership_column = kwargs.get("row_user_ownership_column", None)
comments = kwargs.get("comments")
media_type = kwargs.get("media_type")
auto_detect_media_type = kwargs.get("auto_detect_media_type", True)
auth_stored_procedure = kwargs.get("auth_stored_procedure")
options = kwargs.get("options")
metadata = kwargs.get("metadata")
objects = kwargs.get("objects")
session = kwargs.get("session")
interactive = lib.core.get_interactive_default()
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
kwargs["session"] = session
with lib.core.MrsDbTransaction(session):
schema = None
# If auto_add_schema is set, check if the schema is registered already.
# If not, create it
if auto_add_schema and schema_name and not schema_id:
try:
schema = lib.schemas.get_schema(
schema_name=schema_name,
session=session)
schema_id = schema.get("id")
except:
# If the service does not exist, it should be handled in the scope
# of this particular operation.
service = resolve_service(session=session, required=False)
if not service:
raise RuntimeError(
"Operation cancelled. The service was not found.")
schema_id = lib.schemas.add_schema(schema_name=schema_name,
service_id=service["id"],
request_path=f"/{schema_name}",
requires_auth=True if requires_auth else False,
session=session)
schema = resolve_schema(session, schema_query=schema_id)
if not db_object_type and interactive:
# Get object counts per type
table_count = lib.database.get_object_type_count(
session, schema.get("name"), "TABLE")
view_count = lib.database.get_object_type_count(
session, schema.get("name"), "VIEW")
proc_count = lib.database.get_object_type_count(
session, schema.get("name"), "PROCEDURE")
db_object_types = []
if table_count > 0:
db_object_types.append("TABLE")
if view_count > 0:
db_object_types.append("VIEW")
if proc_count > 0:
db_object_types.append("PROCEDURE")
if len(db_object_types) == 0:
raise ValueError("No database objects in the database schema "
f"{schema.get('name')}")
caption = (
"Please enter the name or index of a database object type"
f"{' [TABLE]: ' if table_count > 0 else ': '}")
db_object_type = lib.core.prompt_for_list_item(
item_list=db_object_types,
prompt_caption=caption,
prompt_default_value="TABLE" if table_count > 0 else None,
print_list=True)
if not db_object_type:
raise ValueError('Operation cancelled.')
if not db_object_name and interactive:
db_objects = lib.database.get_db_objects(
session, schema.get("name"), db_object_type)
if len(db_objects) == 0:
raise ValueError(
f"No database objects of type {db_object_type} in the "
f"database schema {schema.get('name')}")
db_object_name = lib.core.prompt_for_list_item(
item_list=[db_object["OBJECT_NAME"]
for db_object in db_objects],
prompt_caption=("Please enter the name or index of a "
"database object: "),
print_list=True)
if not db_object_name:
raise ValueError('Operation cancelled.')
# If a db_object name has been provided, check if that db_object exists
# in that schema
elif db_object_name and db_object_type:
db_object = lib.database.get_db_object(
session, schema.get("name"), db_object_name, db_object_type)
if not db_object:
raise ValueError(
f"The {db_object_type} named '{db_object_name}' "
f"does not exists in database schema {schema.get('name')}.")
# Get request_path
if not request_path:
if interactive:
request_path = lib.core.prompt(
"Please enter the request path for this object ["
f"/{db_object_name}]: ",
{'defaultValue': '/' + db_object_name}).strip()
else:
request_path = '/' + db_object_name
if not request_path.startswith('/'):
raise Exception("The request_path has to start with '/'.")
if not crud_operation_format and interactive:
crud_operation_format_options = [
'FEED',
'ITEM',
'MEDIA']
crud_operation_format = lib.core.prompt_for_list_item(
item_list=crud_operation_format_options,
prompt_caption=("Please select the CRUD operation format "
"[FEED]: "),
prompt_default_value="FEED",
print_list=True)
if not crud_operation_format:
raise ValueError("No CRUD operation format specified."
"Operation cancelled.")
# Get requires_auth
if requires_auth is None:
if interactive:
requires_auth = lib.core.prompt(
"Should the db_object require authentication? [y/N]: ",
{'defaultValue': 'n'}).strip().lower() == 'y'
else:
requires_auth = False
# Get items_per_page
if items_per_page is None:
if interactive:
items_per_page = lib.core.prompt(
"How many items should be listed per page? "
"[Schema Default]: ",
{'defaultValue': "25"}).strip()
if items_per_page:
if items_per_page != 'NULL':
try:
items_per_page = int(items_per_page)
except:
raise ValueError("No valid value given."
"Operation cancelled.")
else:
items_per_page = None
# Get comments
if comments is None:
if interactive:
comments = lib.core.prompt(
"Comments: ").strip()
else:
comments = ""
db_object_id, grants = lib.db_objects.add_db_object(
session=session, schema_id=schema.get("id"),
db_object_name=db_object_name, request_path=request_path, enabled=enabled,
db_object_type=db_object_type,
items_per_page=items_per_page, requires_auth=requires_auth,
row_user_ownership_enforced=row_user_ownership_enforced,
row_user_ownership_column=row_user_ownership_column,
crud_operation_format=crud_operation_format,
comments=comments, media_type=media_type, auto_detect_media_type=auto_detect_media_type,
auth_stored_procedure=auth_stored_procedure, options=options, metadata=metadata,
objects=objects)
for grant in grants:
lib.core.MrsDbExec(grant).exec(session)
if lib.core.get_interactive_result():
return "\n" + "Object added successfully."
return db_object_id