mrs_plugin/lib/db_objects.py (624 lines of code) (raw):
# Copyright (c) 2022, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from mrs_plugin.lib import core, schemas, database
import json
def format_db_object_listing(db_objects, print_header=False):
"""Formats the listing of db_objects
Args:
db_objects (list): A list of db_objects as dicts
print_header (bool): If set to true, a header is printed
Returns:
The formatted list of services
"""
if print_header:
output = (f"{'ID':>3} {'PATH':35} {'OBJECT NAME':30} {'CRUD':4} "
f"{'TYPE':10} {'ENABLED':7} {'AUTH':4} "
f"{'LAST META CHANGE':16}\n")
else:
output = ""
for i, item in enumerate(db_objects, start=1):
path = (item['host_ctx'] + item['schema_request_path'] +
item['request_path'])
if len(path) > 35:
path = f"{path[:32]}..."
# Shorten the list of CRUD operation names to just the first characters
crud = ''.join([o[0] for o in item['crud_operations']]) \
if item['crud_operations'] else ""
changed_at = str(item['changed_at']) if item['changed_at'] else ""
output += (f"{i:>3} {path[:35]:35} "
f"{item['name'][:30]:30} {crud:4} "
f"{item['object_type'][:9]:10} "
f"{'Yes' if item['enabled'] else '-':7} "
f"{'Yes' if item['requires_auth'] else '-':4} "
f"{changed_at[:16]:16}")
if i < len(db_objects):
output += "\n"
return output
def map_crud_operations(crud_operations):
grant_privileges = []
for crud_operation in crud_operations:
if crud_operation == "CREATE" or crud_operation == "1":
grant_privileges.append("INSERT")
elif crud_operation == "READ" or crud_operation == "2":
grant_privileges.append("SELECT")
elif crud_operation == "UPDATE" or crud_operation == "3":
grant_privileges.append("UPDATE")
elif crud_operation == "DELETE" or crud_operation == "4":
grant_privileges.append("DELETE")
else:
raise ValueError(f"The given CRUD operation {crud_operation} "
"does not exist.")
return grant_privileges
def validate_value(value, value_name):
if not value and not isinstance(value, bool):
raise ValueError(f"The '{value_name}' field was not set.")
return value
def add_where_clause(where, new):
return f"{where} {'AND' if where else 'WHERE'} {new}"
def delete_db_object(session, db_object_id):
# The db_object was not set
if db_object_id is None:
raise ValueError("The specified db_object was not found.")
# Revoke grants for this db_object
db_object = get_db_object(session, db_object_id)
db_schema = schemas.get_schema(session, db_object["db_schema_id"])
database.revoke_all_from_db_object(
session, db_schema["name"], db_object["name"], db_object["object_type"])
# remove the db_object
core.delete(table="db_object",
where="id=?"
).exec(session, [db_object_id]).success
def delete_db_objects(session, db_object_ids: list):
# The list of db_objects to be changed
if not db_object_ids:
raise ValueError("The specified db_object was not found.")
# Update all given services
for db_object_id in db_object_ids:
delete_db_object(session, db_object_id)
def enable_db_object(session, value: bool, db_object_ids: list):
# The list of db_objects to be changed
if not db_object_ids:
raise ValueError("The specified db_object was not found.")
# Update all given services
for db_object_id in db_object_ids:
result = core.update(table="db_object",
sets="enabled=?",
where="id=?"
).exec(session, [value, db_object_id]).success
if not result:
raise Exception(
f"The specified db_object with id {db_object_id} was not "
"found.")
def query_db_objects(session, db_object_id=None, schema_id=None, request_path=None,
db_object_name=None, include_enable_state=None, object_types=None):
# Build SQL based on which input has been provided
# metadata column was only added in 3.0.0, row_user_ownership_enforced and row_user_ownership_column removed
current_version = core.get_mrs_schema_version(session)
if current_version[0] <= 2:
sql = """
SELECT o.id, o.db_schema_id, o.name, o.request_path,
o.requires_auth, o.enabled, o.object_type,
o.items_per_page, o.comments,
sc.request_path AS schema_request_path,
CONCAT(h.name, se.url_context_root) AS host_ctx,
o.crud_operations, o.format as crud_operation_format,
o.media_type, o.auto_detect_media_type,
o.auth_stored_procedure, o.options,
MAX(al.changed_at) as changed_at,
CONCAT(sc.name, '.', o.name) AS qualified_name,
se.id AS service_id, sc.name AS schema_name
FROM mysql_rest_service_metadata.db_object o
LEFT OUTER JOIN mysql_rest_service_metadata.db_schema sc
ON sc.id = o.db_schema_id
LEFT OUTER JOIN mysql_rest_service_metadata.service se
ON se.id = sc.service_id
LEFT JOIN mysql_rest_service_metadata.url_host h
ON se.url_host_id = h.id
LEFT OUTER JOIN (
SELECT new_row_id AS id, MAX(changed_at) as changed_at
FROM mysql_rest_service_metadata.audit_log
WHERE table_name = 'db_object'
GROUP BY new_row_id) al
ON al.id = o.id
"""
else:
sql = """
SELECT o.id, o.db_schema_id, o.name, o.request_path,
o.requires_auth, o.enabled, o.object_type,
o.items_per_page, o.comments,
sc.request_path AS schema_request_path,
CONCAT(h.name, se.url_context_root) AS host_ctx,
o.crud_operations, o.format as crud_operation_format,
o.media_type, o.auto_detect_media_type,
o.auth_stored_procedure, o.options,
o.metadata,
MAX(al.changed_at) as changed_at,
CONCAT(sc.name, '.', o.name) AS qualified_name,
se.id AS service_id, sc.name AS schema_name
FROM mysql_rest_service_metadata.db_object o
LEFT OUTER JOIN mysql_rest_service_metadata.db_schema sc
ON sc.id = o.db_schema_id
LEFT OUTER JOIN mysql_rest_service_metadata.service se
ON se.id = sc.service_id
LEFT JOIN mysql_rest_service_metadata.url_host h
ON se.url_host_id = h.id
LEFT OUTER JOIN (
SELECT new_row_id AS id, MAX(changed_at) as changed_at
FROM mysql_rest_service_metadata.audit_log
WHERE table_name = 'db_object'
GROUP BY new_row_id) al
ON al.id = o.id
"""
params = []
wheres = []
if db_object_id is not None:
wheres.append("o.id = ?")
params.append(db_object_id)
else:
if schema_id is not None:
wheres.append("o.db_schema_id = ?")
params.append(schema_id)
if request_path is not None:
wheres.append("o.request_path = ?")
params.append(request_path)
if db_object_name is not None:
wheres.append("o.name = ?")
params.append(db_object_name)
if object_types is not None:
if len(object_types) > 1:
s = "(" + ("o.object_type = ? OR " * len(object_types))
wheres.append(s[0:-4] + ")")
for t in object_types:
params.append(t)
elif len(object_types) == 1:
wheres.append("o.object_type = ?")
params.append(object_types[0])
if include_enable_state is not None:
wheres.append("o.enabled = ?")
params.append("1" if include_enable_state else "0")
sql += core._generate_where(wheres)
sql += " GROUP BY o.id ORDER BY o.request_path"
return core.MrsDbExec(sql, params).exec(session).items
def get_db_object(session, db_object_id: bytes = None, schema_id: bytes = None, request_path=None, db_object_name=None,
absolute_request_path: str = None):
"""Gets a specific MRS db_object
Args:
session (object): The database session to use.
db_object_id: The id of the db_object
schema_id: The id of the schema
request_path (str): The request_path of the schema
db_object_name (str): The name of the schema
absolute_request_path (str): The absolute request_path to the db_object
Returns:
The db_object as dict
"""
result = None
if db_object_id is not None:
result = query_db_objects(session=session, db_object_id=db_object_id)
else:
if request_path:
result = query_db_objects(
session=session, schema_id=schema_id, request_path=request_path)
elif db_object_name:
result = query_db_objects(
session=session, schema_id=schema_id, db_object_name=db_object_name)
elif absolute_request_path:
result = database.get_object_via_absolute_request_path(
session=session, absolute_request_path=absolute_request_path,
ignore_case=True)
return result[0] if result else None
def get_db_objects(session, schema_id: bytes, include_enable_state=None, object_types=None):
"""Returns all db_objects for the given schema
Args:
schema_id: The id of the schema to list the db_objects from
include_enable_state (bool): Only include db_objects with the given
enabled state
session (object): The database session to use
Returns:
A list of dicts representing the db_objects of the schema
"""
return query_db_objects(
session=session, schema_id=schema_id,
include_enable_state=include_enable_state, object_types=object_types)
def add_db_object(session, schema_id, db_object_name, request_path, db_object_type,
enabled, items_per_page, requires_auth, crud_operation_format,
comments, media_type, auto_detect_media_type, auth_stored_procedure,
options, objects, metadata=None, internal=False, db_object_id=None, reuse_ids=False,
row_user_ownership_enforced=None, row_user_ownership_column=None):
if not isinstance(db_object_name, str):
raise Exception('Invalid object name.')
if db_object_type not in ["TABLE", "VIEW", "PROCEDURE", "FUNCTION", "SCRIPT"]:
raise ValueError(
'Invalid db_object_type. Only valid types are TABLE, VIEW, PROCEDURE and FUNCTION.')
if not crud_operation_format:
raise ValueError("No CRUD operation format specified."
"Operation cancelled.")
if row_user_ownership_enforced is None:
row_user_ownership_enforced = False
if auto_detect_media_type is None:
auto_detect_media_type = False
current_version = core.get_mrs_schema_version(session=session)
if current_version[0] <= 2:
if row_user_ownership_enforced and not row_user_ownership_column:
raise ValueError('Operation cancelled.')
if not comments:
comments = ""
schema = schemas.get_schema(session=session,
schema_id=schema_id, auto_select_single=True)
if db_object_id is None:
db_object_id = core.get_sequence_id(session)
crud_operations = calculate_crud_operations(
db_object_type=db_object_type, objects=objects, options=options)
values = {
"id": db_object_id,
"db_schema_id": schema_id,
"name": db_object_name,
"request_path": request_path,
"object_type": db_object_type,
"enabled": enabled,
"items_per_page": items_per_page,
"requires_auth": int(requires_auth),
"row_user_ownership_enforced": int(row_user_ownership_enforced),
"row_user_ownership_column": row_user_ownership_column,
"crud_operations": crud_operations,
"format": crud_operation_format,
"comments": comments,
"media_type": media_type,
"metadata": metadata,
"auto_detect_media_type": int(auto_detect_media_type),
"auth_stored_procedure": auth_stored_procedure,
"options": options,
"internal": internal,
}
if current_version[0] >= 3:
# Remove row_user_ownership_enforced and row_user_ownership_column from db_object values as they are now
# passed in object and object_reference directly
values.pop("row_user_ownership_enforced", None)
values.pop("row_user_ownership_column", None)
# Update object.row_ownership_field_id when the old parameters are still used
if row_user_ownership_enforced and row_user_ownership_column:
for obj in objects:
fields = obj.get("fields", [])
for field in fields:
db_column = field.get("db_column", None)
if db_column is not None:
if db_column.get("name") == row_user_ownership_column:
obj["row_ownership_field_id"] = field.get("id")
else:
values.pop("metadata", None)
values.pop("internal", None)
core.insert(table="db_object", values=values).exec(session)
set_objects(session, db_object_id, objects)
if db_object_type == "PROCEDURE" or db_object_type == "FUNCTION":
grant_privileges = ["EXECUTE"]
else:
grant_privileges = map_crud_operations(crud_operations)
if not grant_privileges:
raise ValueError("No valid CRUD Operation specified")
# Ensure that the explicit grants lookup with get does not fail
if options is None:
options = {}
if db_object_type == "SCRIPT":
return db_object_id, database.get_grant_statements_for_explicit_grants(
options.get("grants", None)
)
else:
return db_object_id, database.get_grant_statements(
session=session,
schema_name=schema["name"],
db_object_name=db_object_name,
grant_privileges=grant_privileges,
objects=objects,
db_object_type=db_object_type,
explicit_grants=options.get("grants", None),
disable_automatic_grants=options.get(
"disableAutomaticGrants", False),
)
def get_crud_operations(session, db_object_id: bytes):
result = core.select("db_object",
cols="crud_operations, format",
where=["id = ?"]).exec(session, [db_object_id]).first
return result["crud_operations"], result["format"]
def get_available_db_object_row_ownership_fields(session, schema_name, db_object_name, db_object_type):
if db_object_type == "PROCEDURE":
sql = core.select(table="`INFORMATION_SCHEMA`.`PARAMETERS`",
cols="PARAMETER_NAME as name",
where=["SPECIFIC_SCHEMA = ?",
"SPECIFIC_NAME = ?", "PARAMETER_MODE = 'IN'"],
order="ORDINAL_POSITION")
else:
sql = core.select(table="`INFORMATION_SCHEMA`.`COLUMNS`",
cols="COLUMN_NAME as name",
where=["TABLE_SCHEMA = ?", "TABLE_NAME = ?",
"GENERATION_EXPRESSION = ''"],
order="ORDINAL_POSITION")
return [record["name"] for record in sql.exec(session, [schema_name, db_object_name]).items]
def update_db_objects(session, db_object_ids, value, merge_options=False):
if "crud_operation_format" in value:
value["format"] = value.pop("crud_operation_format")
for db_object_id in db_object_ids:
db_object = get_db_object(session=session, db_object_id=db_object_id)
objects = value.pop("objects", None)
if objects is not None:
core.check_mrs_object_names(
session=session, db_schema_id=value["db_schema_id"], objects=objects)
value["crud_operations"] = calculate_crud_operations(
db_object_type=db_object.get("object_type"), objects=objects,
options=db_object.get("options", None))
# Prepare the merge of options, if requested
if merge_options:
options = value.get("options", None)
# Check if there are options set already, if so, merge the options
if options is not None:
row = core.MrsDbExec("""
SELECT options IS NULL AS options_is_null
FROM `mysql_rest_service_metadata`.`db_object`
WHERE id = ?""", [db_object_id]).exec(session).first
if row and row["options_is_null"] == 1:
merge_options = False
else:
value.pop("options")
if value:
core.update("db_object",
sets=value,
where=["id=?"]).exec(session, [db_object_id])
# Merge options if requested
if merge_options and options is not None:
core.MrsDbExec("""
UPDATE `mysql_rest_service_metadata`.`db_object`
SET options = JSON_MERGE_PATCH(options, ?)
WHERE id = ?
""", [options, db_object_id]).exec(session)
grant_privileges = map_crud_operations(
value.get("crud_operations", []))
db_object = get_db_object(session, db_object_id)
schema = schemas.get_schema(session, db_object["db_schema_id"])
# Revoke all grants before granting the necessary ones
database.revoke_all_from_db_object(
session, schema["name"], db_object["name"], db_object["object_type"])
options = value.get("options", {})
# Ensure that the explicit grants lookup with get does not fail, when options actually stores None
if options is None:
options = {}
# Grant privilege to the 'mysql_rest_service_data_provider' role
database.grant_db_object(
session, schema.get(
"name"), db_object['name'], grant_privileges, objects, db_object["object_type"],
explicit_grants=options.get("grants", None),
disable_automatic_grants=options.get("disableAutomaticGrants", False))
if objects is not None:
set_objects(session, db_object_id, objects)
def db_schema_object_is_table(session, db_schema_name, db_object_name):
return database.db_schema_object_is_table(
session=session,
db_schema_name=db_schema_name,
db_object_name=db_object_name)
def get_db_object_parameters(session, db_object_id=None,
db_schema_name=None, db_object_name=None,
db_type="PROCEDURE"):
if db_object_id:
db_object = get_db_object(session, db_object_id)
if not db_object:
raise ValueError(
"The database object must be identified via schema_name and db_object_name "
"or db_object_id.")
db_schema_name = db_object["schema_name"]
db_object_name = db_object["name"]
db_type = db_object["object_type"]
if db_object["object_type"] != "PROCEDURE" or db_object["object_type"] != "FUNCTION":
raise ValueError(
"This function can only be called for PROCEDUREs and FUNCTIONs.")
return database.get_db_object_parameters(
session=session, db_schema_name=db_schema_name, db_object_name=db_object_name, db_type=db_type)
def get_db_function_return_type(session, db_schema_name, db_object_name):
return database.get_db_function_return_type(
session=session, db_schema_name=db_schema_name, db_object_name=db_object_name)
def get_table_columns_with_references(session, db_object_id=None,
schema_name=None, db_object_name=None, db_object_type=None):
if db_object_id:
db_object = get_db_object(session, db_object_id)
if not db_object:
raise ValueError(
"The database object must be identified via schema_name, db_object_name and db_object_type "
"or via the request_path and db_object_name.")
schema_name = db_object["schema_name"]
db_object_name = db_object["name"]
db_object_type = db_object["object_type"]
if db_object_type and db_object_type not in ["TABLE", "VIEW"]:
raise ValueError(
"The object_type must be either set to TABLE or VIEW.")
return database.get_table_columns_with_references(session, schema_name, db_object_name, db_object_type)
def get_objects(session, db_object_id):
return database.get_objects(session, db_object_id)
def get_object_fields_with_references(session, object_id, binary_formatter=None):
return database.get_object_fields_with_references(session, object_id, binary_formatter=binary_formatter)
def set_objects(session, db_object_id, objects):
if objects is None:
objects = []
sql = "DELETE FROM mysql_rest_service_metadata.object WHERE db_object_id = ?"
core.MrsDbExec(sql).exec(session, [core.id_to_binary(
db_object_id, "db_object_id")]).items
for obj in objects:
set_object_fields_with_references(session, db_object_id, obj)
def set_object_fields_with_references(session, db_object_id, obj):
values = {
"id": core.id_to_binary(obj.get("id"), "object.id"),
"db_object_id": core.id_to_binary(db_object_id, "db_object_id"),
"name": obj.get("name"),
"kind": obj.get("kind", "RESULT"),
"position": obj.get("position"),
"sdk_options": core.convert_dict_to_json_string(obj.get("sdk_options")),
"comments": obj.get("comments"),
}
current_version = core.get_mrs_schema_version(session=session)
if current_version[0] >= 3:
options = obj.get("options", None)
# To be backwards compatible, duplicate the options using the old key names
if current_version[0] >= 1 and options is not None:
options["duality_view_insert"] = options.get(
"dataMappingViewInsert", None)
options["duality_view_update"] = options.get(
"dataMappingViewUpdate", None)
options["duality_view_delete"] = options.get(
"dataMappingViewDelete", None)
if options.get("dataMappingViewNoCheck", None) is not None:
options["duality_view_no_check"] = options.get(
"dataMappingViewNoCheck", None)
values["options"] = options
row_ownership_field_id = obj.get("row_ownership_field_id", None)
if row_ownership_field_id is not None:
values["row_ownership_field_id"] = core.id_to_binary(
row_ownership_field_id, "row_ownership_field_id")
core.insert(table="object", values=values).exec(session)
fields = obj.get("fields", [])
# Insert object_references first
inserted_object_references_ids = []
for field in fields:
obj_ref = field.get("object_reference")
if (obj_ref is not None and
(not (obj_ref.get("id") in inserted_object_references_ids))):
inserted_object_references_ids.append(obj_ref.get("id"))
# make sure to covert the sub Dict with dict()
ref_map = obj_ref.get("reference_mapping")
ref_map_json = None
if ref_map is not None:
ref_map = dict(ref_map)
# Convert column_mapping, which is a list of dict
converted_col_mapping = []
for cm in ref_map["column_mapping"]:
cm_dict = dict(cm)
# Check if column_mapping already follows new format
if "base" in cm_dict and "ref" in cm_dict:
converted_col_mapping.append(cm_dict)
else:
# If not, convert to new format that uses "base" and "ref" keys
for key in cm_dict.keys():
converted_col_mapping.append(
{"base": key, "ref": cm_dict.get(key)})
ref_map["column_mapping"] = converted_col_mapping
ref_map_json = json.dumps(ref_map)
if not ref_map_json:
raise Exception(
f'reference_mapping not defined for field {field.get("name")}')
values = {
"id": core.id_to_binary(obj_ref.get("id"), "objectReference.id"),
"reduce_to_value_of_field_id": core.id_to_binary(
obj_ref.get("reduce_to_value_of_field_id"),
"objectReference.reduce_to_value_of_field_id", True),
"reference_mapping": ref_map_json,
"unnest": obj_ref.get("unnest"),
"sdk_options": core.convert_dict_to_json_string(obj_ref.get("sdk_options")),
"comments": obj_ref.get("comments"),
}
if current_version[0] >= 3:
options = obj.get("options", None)
# To be backwards compatible, duplicate the options using the old key names
if current_version[0] >= 1 and options is not None:
options["duality_view_insert"] = options.get(
"dataMappingViewInsert", None)
options["duality_view_update"] = options.get(
"dataMappingViewUpdate", None)
options["duality_view_delete"] = options.get(
"dataMappingViewDelete", None)
if options.get("dataMappingViewNoCheck", None) is not None:
options["duality_view_no_check"] = options.get(
"dataMappingViewNoCheck", None)
values["options"] = options
row_ownership_field_id = obj_ref.get(
"row_ownership_field_id", None)
if row_ownership_field_id is not None:
values["row_ownership_field_id"] = core.id_to_binary(row_ownership_field_id,
"objectReference.row_ownership_field_id")
core.insert(table="object_reference", values=values).exec(session)
# Then insert object_fields
inserted_field_ids = []
for field in fields:
obj_ref = field.get("object_reference")
if (not (field.get("id") in inserted_field_ids)):
inserted_field_ids.append(field.get("id"))
values = {
"id": core.id_to_binary(field.get("id"), "field.id"),
"object_id": core.id_to_binary(field.get("object_id"), "field.object_id"),
"parent_reference_id": core.id_to_binary(
field.get("parent_reference_id"), "field.parent_reference_id", True),
"represents_reference_id": core.id_to_binary(
field.get("represents_reference_id"), "field.represents_reference_id", True),
"name": field.get("name"),
"position": field.get("position"),
"db_column": core.convert_dict_to_json_string(field.get("db_column")),
"enabled": field.get("enabled"),
"allow_filtering": field.get("allow_filtering"),
"allow_sorting": field.get("allow_sorting", 0),
"no_check": field.get("no_check"),
"no_update": field.get("no_update"),
"sdk_options": core.convert_dict_to_json_string(field.get("sdk_options")),
"comments": field.get("comments"),
}
if current_version[0] >= 3:
values["options"] = field.get("options", None)
values["json_schema"] = core.convert_dict_to_json_string(
field.get("json_schema", None))
core.insert(table="object_field", values=values).exec(session)
def calculate_crud_operations(db_object_type, objects=None, options=None):
if db_object_type == "SCRIPT":
return ["CREATE", "READ", "UPDATE"]
if db_object_type == "PROCEDURE" or db_object_type == "FUNCTION":
if options is not None and options.get("mysqlTask", None) is not None:
return ["CREATE", "READ", "UPDATE", "DELETE"]
else:
return ["CREATE"]
if objects is None:
return ["READ"]
if len(objects) == 0:
raise Exception("No object result definition present.")
obj = objects[0]
options = obj.get("options", {})
if options is None:
options = {}
crudOps = ["READ"]
if options.get("dataMappingViewInsert", False) is True:
crudOps.append("CREATE")
if options.get("dataMappingViewUpdate", False) is True:
crudOps.append("UPDATE")
if options.get("dataMappingViewDelete", False) is True:
crudOps.append("DELETE")
# Loop over all fields and check if an object reference has a CRUD operation set. If so, the mrsObject
# needs to be updatable as well
if "UPDATE" not in crudOps:
for field in obj.get("fields"):
options = field.get("options", None)
if options is not None and len(crudOps) < 4:
if "UPDATE" not in crudOps and \
(options.get("dataMappingViewInsert", False) is True
or options.get("dataMappingViewUpdate", False) is True
or options.get("dataMappingViewDelete", False) is True):
crudOps.append("UPDATE")
return crudOps
def walk(fields, parent_id=None, level=1, add_data_type=False, current_object=None):
result = ""
filtered_fields = list(
filter(
lambda f: f.get("reduceToValueOfFieldId", {}).get(
"reduce_to_value_of_field_id"
),
fields,
)
)
reduce_to_field_ids = [
f.get("reduceToValueOfFieldId", {}).get("reduce_to_value_of_field_id", "")
for f in filtered_fields
]
indent = " " * level * 4
for field in fields:
if field.get("parent_reference_id") != parent_id:
continue
if not field.get("object_reference") and (
field["enabled"] or field["id"] in reduce_to_field_ids
):
attributes = []
inout = f'@{"IN" if field["db_column"].get("in") else ""}{"OUT" if field["db_column"].get("out") else ""}'
inout != "@" and attributes.append(inout)
field.get("db_column", {}).get("is_primary", False) and attributes.append("@KEY")
field.get("no_check") and attributes.append("@NOCHECK")
field.get("no_update") and attributes.append("@NOUPDATE")
field.get("allow_sorting") and attributes.append("@SORTABLE")
not field.get("allow_filtering") and attributes.append("@NOFILTERING")
add_data_type and field["db_column"] and field["db_column"][
"datatype"
] and attributes.append(f'@DATATYPE("{field["db_column"]["datatype"]}")')
if field["id"] == current_object.get("row_ownership_field_id", None):
attributes.append("@ROWOWNERSHIP")
result += f"{indent}{field['name']}: {field['db_column']['name']}"
if attributes:
result = f"{result} {' '.join(attributes)}"
result = f"{result},\n"
elif (
field.get("object_reference")
and field["object_reference"].get("unnest")
or field["enabled"]
):
ref_table = f'{field["object_reference"]["reference_mapping"]["referenced_schema"]}.{field["object_reference"]["reference_mapping"]["referenced_table"]}'
attributes = []
options = field["object_reference"].get("options", {})
if options is None:
options = {}
if options.get("dataMappingViewInsert", False):
attributes.append("@INSERT")
if options.get("dataMappingViewUpdate", False):
attributes.append("@UPDATE")
if options.get("dataMappingViewDelete", False):
attributes.append("@DELETE")
if options.get("dataMappingViewNoCheck", False):
attributes.append("@NOCHECK")
if field["object_reference"]["unnest"] or field["object_reference"].get(
"reduce_to_value_of_field_id"
):
attributes.append("@UNNEST")
children = core.cut_last_comma(
walk(
fields=fields,
parent_id=field["represents_reference_id"],
level=level + 1,
add_data_type=add_data_type,
current_object=field["object_reference"],
)
)
result = f'{result}{indent}{field["name"]}: {ref_table}'
if attributes:
result = f'{result} {" ".join(attributes)}'
if children:
result = f"{result} {{\n{children}\n{indent}}},\n"
return result
def get_db_object_create_statement(session, db_object, objects) -> str:
object_type = "VIEW" if db_object.get("object_type") == "TABLE" else db_object.get("object_type")
output = [
f'CREATE OR REPLACE REST {object_type} {db_object.get("request_path")}',
f' ON SERVICE {db_object.get("host_ctx")} SCHEMA {db_object.get("schema_request_path")}',
f' AS {db_object.get("qualified_name")}'
]
if object_type != "PROCEDURE" and object_type != "FUNCTION":
class_header = f"CLASS {objects[0]["name"]}"
options = objects[0].get("options", {})
# can exist and be None
if options is not None:
if options.get("dataMappingViewInsert", False):
class_header += " @INSERT"
if options.get("dataMappingViewUpdate", False):
class_header += " @UPDATE"
if options.get("dataMappingViewDelete", False):
class_header += " @DELETE"
if options.get("dataMappingViewNoCheck", False):
class_header += " @NOCHECK"
fields = []
if len(objects) > 0:
fields = get_object_fields_with_references(
session=session, object_id=objects[0]["id"]
)
class_header += " {"
output.append(f"{output.pop()} {class_header}")
output.append(core.cut_last_comma(walk(fields=fields, level=2, current_object=objects[0])))
output.append(" }")
else:
#output.append("\n")
for object in objects:
fields = get_object_fields_with_references(
session=session, object_id=object["id"]
)
children = core.cut_last_comma(
walk(
fields=fields,
level=2,
add_data_type=object["kind"] == "RESULT",
current_object=object,
)
)
output.append(f' {object["kind"]} {object["name"]}')
if children:
output.append(f" {{\n{children}\n }}")
if db_object["enabled"] == 2:
output.append(" PRIVATE")
elif db_object["enabled"] is False or db_object["enabled"] == 0:
output.append(" DISABLED")
output.append(" AUTHENTICATION REQUIRED" if db_object["requires_auth"] in [True, 1] \
else " AUTHENTICATION NOT REQUIRED")
# 25 is the default value
if (
db_object["items_per_page"] is not None
and db_object["items_per_page"] != 25
):
output.append(f' ITEMS PER PAGE {db_object["items_per_page"]}')
if db_object["comments"]: # ignore either None or empty
output.append(f" COMMENT {core.squote_str(db_object["comments"])}")
if db_object["media_type"] is not None:
output.append(f" MEDIA TYPE {core.squote_str(db_object["media_type"])}")
if db_object["crud_operation_format"] != "FEED":
output.append(f' FORMAT {db_object["crud_operation_format"]}')
if db_object["auth_stored_procedure"]: # ignore either None or empty
output.append(f' AUTHENTICATION PROCEDURE {db_object["auth_stored_procedure"]}')
if db_object.get("options"):
output.append(core.format_json_entry("OPTIONS", db_object.get("options")))
if db_object.get("metadata"):
output.append(core.format_json_entry("METADATA", db_object.get("metadata")))
# Build CREATE statement
return "\n".join(output) + ";"