mrs_plugin/lib/database.py (312 lines of code) (raw):
# Copyright (c) 2021, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
Module that deals with the "real" database schema instead of the MRS objects
"""
import json
from mrs_plugin.lib import core
import traceback
def quote_identifier(identifier):
return f"`{identifier.replace('`', '``')}`"
def get_schemas(session, ignore_system_schemas=True):
ignore = ['.%', 'mysql_%', 'mysql'] if ignore_system_schemas else []
wheres = ["SCHEMA_NAME NOT LIKE ?", "SCHEMA_NAME NOT LIKE ?",
"SCHEMA_NAME <> ?"] if ignore_system_schemas else None
return core.select(table="INFORMATION_SCHEMA.SCHEMATA",
cols="SCHEMA_NAME",
where=wheres
).exec(session, params=ignore).items
def get_schema(session, schema_name):
return core.select(table="INFORMATION_SCHEMA.SCHEMATA",
cols="SCHEMA_NAME",
where=["SCHEMA_NAME = ?"]
).exec(session, params=[schema_name]).first
def get_db_objects(session, schema_name, db_object_type):
if db_object_type == "TABLE":
return core.select(table="INFORMATION_SCHEMA.TABLES", cols="TABLE_NAME AS OBJECT_NAME",
where=["TABLE_SCHEMA=? /*=sakila*/",
"TABLE_TYPE = 'BASE TABLE'"],
order="TABLE_NAME"
).exec(session, [schema_name]).items
elif db_object_type == "VIEW":
return core.select(table="INFORMATION_SCHEMA.TABLES", cols="TABLE_NAME AS OBJECT_NAME",
where=["TABLE_SCHEMA=? /*=sakila*/",
"(TABLE_TYPE='VIEW' OR TABLE_TYPE='SYSTEM VIEW')"],
order="TABLE_NAME"
).exec(session, [schema_name]).items
elif db_object_type == "PROCEDURE":
return core.select(table="INFORMATION_SCHEMA.ROUTINES", cols="ROUTINE_NAME AS OBJECT_NAME",
where=["ROUTINE_SCHEMA=? /*=sakila*/",
"ROUTINE_TYPE='PROCEDURE'"],
order="ROUTINE_NAME"
).exec(session, [schema_name]).items
elif db_object_type == "FUNCTION":
return core.select(table="INFORMATION_SCHEMA.ROUTINES", cols="ROUTINE_NAME AS OBJECT_NAME",
where=["ROUTINE_SCHEMA=? /*=sakila*/",
"ROUTINE_TYPE='FUNCTION'"],
order="ROUTINE_NAME"
).exec(session, [schema_name]).items
raise ValueError('Invalid db_object_type. Only valid types are '
'TABLE, VIEW, PROCEDURE, FUNCTION and SCHEMA.')
def get_db_object(session, schema_name, db_object_name, db_object_type):
if db_object_type == "TABLE":
return core.select(table="INFORMATION_SCHEMA.TABLES", cols="TABLE_NAME AS OBJECT_NAME",
where=["TABLE_SCHEMA=?",
"TABLE_TYPE='BASE TABLE'", "TABLE_NAME=?"]
).exec(session, [schema_name, db_object_name]).first
elif db_object_type == "VIEW":
return core.select(table="INFORMATION_SCHEMA.TABLES", cols="TABLE_NAME AS OBJECT_NAME",
where=[
"TABLE_SCHEMA=?", "(TABLE_TYPE='VIEW' OR TABLE_TYPE='SYSTEM VIEW')", "TABLE_NAME=?"]
).exec(session, [schema_name, db_object_name]).first
elif db_object_type == "PROCEDURE":
return core.select(table="INFORMATION_SCHEMA.ROUTINES", cols="ROUTINE_NAME AS OBJECT_NAME",
where=["ROUTINE_SCHEMA=?",
"ROUTINE_TYPE='PROCEDURE'", "ROUTINE_NAME=?"]
).exec(session, [schema_name, db_object_name]).first
elif db_object_type == "FUNCTION":
return core.select(table="INFORMATION_SCHEMA.ROUTINES", cols="ROUTINE_NAME AS OBJECT_NAME",
where=["ROUTINE_SCHEMA=?",
"ROUTINE_TYPE='FUNCTION'", "ROUTINE_NAME=?"]
).exec(session, [schema_name, db_object_name]).first
raise ValueError('Invalid db_object_type. Only valid types are '
'TABLE, VIEW, PROCEDURE, FUNCTION and SCHEMA.')
def get_object_type_count(session, schema_name, db_object_type):
if db_object_type == "TABLE":
query = core.select(table="INFORMATION_SCHEMA.TABLES",
cols="COUNT(*) AS object_count",
where=["TABLE_SCHEMA=?", "TABLE_TYPE='BASE TABLE'"]
)
if db_object_type == "VIEW":
query = core.select(table="INFORMATION_SCHEMA.TABLES",
cols="COUNT(*) AS object_count",
where=[
"TABLE_SCHEMA=?", "(TABLE_TYPE = 'VIEW' OR TABLE_TYPE = 'SYSTEM VIEW')"]
)
elif db_object_type == "PROCEDURE":
query = core.select(table="INFORMATION_SCHEMA.ROUTINES",
cols="COUNT(*) AS object_count",
where=["ROUTINE_SCHEMA=?",
"ROUTINE_TYPE='PROCEDURE'"]
)
elif db_object_type == "FUNCTION":
query = core.select(table="INFORMATION_SCHEMA.ROUTINES",
cols="COUNT(*) AS object_count",
where=["ROUTINE_SCHEMA=?",
"ROUTINE_TYPE='FUNCTION'"]
)
else:
raise ValueError('Invalid db_object_type. Only valid types are '
'TABLE, VIEW, PROCEDURE and FUNCTION.')
row = query.exec(session, [schema_name]).first
return int(row["object_count"]) if row else 0
def get_db_object_parameters(session, db_schema_name, db_object_name, db_type):
sql = """
SELECT @id:=@id-1 AS id, @id * -1 AS position,
PARAMETER_NAME AS name,
PARAMETER_MODE AS mode,
DTD_IDENTIFIER AS datatype,
CHARACTER_SET_NAME as charset,
COLLATION_NAME as collation
FROM `INFORMATION_SCHEMA`.`PARAMETERS`, (SELECT @id:=0) as init
WHERE SPECIFIC_SCHEMA = ?
AND SPECIFIC_NAME = ?
AND ROUTINE_TYPE = ?
AND NOT ISNULL(PARAMETER_MODE)
ORDER BY ORDINAL_POSITION
"""
return core.MrsDbExec(sql).exec(session, [db_schema_name, db_object_name, db_type]).items
def get_db_function_return_type(session, db_schema_name, db_object_name):
sql = """
SELECT DATA_TYPE
FROM `INFORMATION_SCHEMA`.`ROUTINES`
WHERE ROUTINE_SCHEMA = ?
AND ROUTINE_NAME = ?
"""
row = core.MrsDbExec(sql).exec(
session, [db_schema_name, db_object_name]).first
return row["DATA_TYPE"] if row else None
def db_schema_object_is_table(session, db_schema_name, db_object_name):
sql = """
SELECT TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
"""
row = core.MrsDbExec(sql).exec(
session, [db_schema_name, db_object_name]).first
return row and "TABLE" in row["TABLE_TYPE"]
def stored_object_executes_as_invoker(session, schema_name, db_object_name):
sql = """
SELECT SECURITY_TYPE FROM INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?
UNION
SELECT SECURITY_TYPE FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_SCHEMA = ? and ROUTINE_NAME = ?
"""
row = (
core.MrsDbExec(sql)
.exec(session, [schema_name, db_object_name, schema_name, db_object_name])
.first
)
return row and "INVOKER" in row["SECURITY_TYPE"]
def get_tables_used_in_view_including_required_grants(
session, schema_name, db_object_name
):
sql = """
SELECT TABLE_NAME AS OBJ_NAME FROM INFORMATION_SCHEMA.VIEW_TABLE_USAGE
WHERE VIEW_SCHEMA = ? AND VIEW_NAME = ?
"""
rows = core.MrsDbExec(sql).exec(
session, [schema_name, db_object_name]).items
return [
(row["OBJ_NAME"], "TABLE", ["SELECT", "INSERT", "UPDATE", "DELETE"])
for row in rows
]
def get_routines_used_in_view_including_required_grants(
session, schema_name, db_object_name
):
sql = """
SELECT DISTINCT t1.SPECIFIC_NAME AS OBJ_NAME, t2.ROUTINE_TYPE OBJ_TYPE
FROM INFORMATION_SCHEMA.VIEW_ROUTINE_USAGE t1
JOIN INFORMATION_SCHEMA.ROUTINES t2
ON t1.SPECIFIC_NAME = t2.ROUTINE_NAME
WHERE t1.TABLE_SCHEMA = ? AND t1.TABLE_NAME = ?;
"""
rows = core.MrsDbExec(sql).exec(
session, [schema_name, db_object_name]).items
return [(row["OBJ_NAME"], row["OBJ_TYPE"], ["EXECUTE"]) for row in rows]
def get_objects_used_in_view_including_required_grants(
session, schema_name, db_object_name
):
objects_with_grants = get_tables_used_in_view_including_required_grants(
session, schema_name, db_object_name
)
objects_with_grants.extend(
get_routines_used_in_view_including_required_grants(
session, schema_name, db_object_name
)
)
return objects_with_grants
def verify_privilege(privilege):
valid_privileges = [
'ALTER', 'ALTER ROUTINE', 'CREATE', 'CREATE ROUTINE',
'CREATE TEMPORARY TABLES', 'CREATE VIEW', 'DELETE', 'DROP', 'EVENT', 'EXECUTE', 'INDEX', 'INSERT',
'LOCK TABLES', 'REFERENCES', 'SELECT', 'SHOW DATABASES', 'SHOW VIEW', 'TRIGGER', 'UPDATE', 'USAGE']
if privilege.upper() not in valid_privileges:
raise ValueError(
f'Invalid privilege {privilege} specified. Valid privileges are {", ".join(valid_privileges)}.')
return privilege
def get_normalized_grant_privileges(privileges):
norm_privileges = []
if isinstance(privileges, str):
norm_privileges = [{
"privilege": verify_privilege(privileges),
}]
elif isinstance(privileges, list):
for priv in privileges:
if isinstance(priv, str):
norm_privileges.append({
"privilege": verify_privilege(priv),
})
elif isinstance(priv, dict):
norm_privileges.append({
"privilege": verify_privilege(priv.get("privilege")),
"columnList": priv.get("columnList")
})
return norm_privileges
def get_grant_statements_for_explicit_grants(grants):
if grants is None:
return []
# Normalize privileges
norm_grants = []
if isinstance(grants, dict):
norm_grants = [{
"object": grants.get("object"),
"schema": grants.get("schema"),
"objectType": grants.get("objectType", None),
"privileges": get_normalized_grant_privileges(grants.get("privileges"))
}]
elif isinstance(grants, list):
norm_grants = []
for grant in grants:
norm_grants.append({
"object": grant.get("object"),
"schema": grant.get("schema"),
"objectType": grant.get("objectType", None),
"privileges": get_normalized_grant_privileges(grant.get("privileges"))
})
# Build grant statements
grant_statements = []
for grant in norm_grants:
privileges = []
for privilege in grant["privileges"]:
if privilege.get("columnList", None) is not None:
privileges.append(privilege.get(
"privilege") + " (" + ", ".join(privilege.get("columnList")) + ")")
else:
privileges.append(privilege.get("privilege"))
privileges = ", ".join(privileges)
object_type = grant.get("objectType", None)
if object_type not in ["PROCEDURE", "FUNCTION"]:
object_type = ""
grant_statements.append(
f"GRANT {privileges} ON {object_type}"
f"{quote_identifier(grant['schema'])}.{quote_identifier(grant['object'])} " +
"TO 'mysql_rest_service_data_provider'@'%'")
return grant_statements
def get_grant_statements(
session, schema_name, db_object_name, grant_privileges, objects, db_object_type=None, explicit_grants=None,
disable_automatic_grants=False):
# We can not grant/revoke the information_schema
if schema_name.lower() == "information_schema":
return []
if grant_privileges and not disable_automatic_grants:
# We can only grant select on the performance_schema
if schema_name.lower() == "performance_schema":
grant_privileges = ["SELECT"]
if db_object_type == "PROCEDURE" or db_object_type == "FUNCTION":
grant_privileges = ["EXECUTE"]
db_objects = [(db_object_name, db_object_type, grant_privileges)]
# A view that executes in invoker security context can perform only operations for which the invoker has
# privileges. This means the MRS user needs the additional grants for the underlying tables.
if db_object_type == "VIEW" and stored_object_executes_as_invoker(
session, schema_name, db_object_name
):
db_objects.extend(
[
(obj_name, obj_type, obj_grants)
for obj_name, obj_type, obj_grants in get_objects_used_in_view_including_required_grants(
session, schema_name, db_object_name
)
]
)
grants = [
f"""GRANT {','.join(obj_grants)}
ON {obj_type if obj_type == "PROCEDURE" or obj_type == "FUNCTION" else ''}
{quote_identifier(schema_name)}.{quote_identifier(obj_name)}
TO 'mysql_rest_service_data_provider'@'%'"""
for obj_name, obj_type, obj_grants in db_objects
]
# If the object is not a procedure, also add all referenced tables and views
if db_object_type != "PROCEDURE" and db_object_type != "FUNCTION" and objects is not None:
for obj in objects:
for field in obj.get("fields"):
if (field.get("object_reference") and
(field["object_reference"].get("unnest") or field["enabled"])):
ref_table = (f'{field["object_reference"]["reference_mapping"]["referenced_schema"]}' +
f'.{field["object_reference"]["reference_mapping"]["referenced_table"]}')
grants.append(f"""GRANT {','.join(grant_privileges)}
ON {ref_table}
TO 'mysql_rest_service_data_provider'@'%'""")
else:
grants = []
if explicit_grants is not None:
grants.extend(
get_grant_statements_for_explicit_grants(explicit_grants))
return grants
def grant_db_object(session, schema_name, db_object_name, grant_privileges, objects=None, db_object_type=None,
explicit_grants=None, disable_automatic_grants=False):
grants = get_grant_statements(
session, schema_name, db_object_name, grant_privileges, objects, db_object_type,
explicit_grants, disable_automatic_grants)
for grant in grants:
session.run_sql(grant)
def revoke_all_from_db_object(session, schema_name, db_object_name, db_object_type):
# We can not grant/revoke the information_schema
if schema_name.lower() == "information_schema":
return
# We can not REVOKE ALL when dealing with the performance_schema
if schema_name.lower() in ["performance_schema"]:
sql = f"""
REVOKE IF EXISTS SELECT ON
{schema_name}.{db_object_name}
FROM 'mysql_rest_service_data_provider'@'%'
"""
else:
if db_object_type == "PROCEDURE":
revoke = "EXECUTE ON PROCEDURE"
elif db_object_type == "FUNCTION":
revoke = "EXECUTE ON FUNCTION"
else:
revoke = "ALL PRIVILEGES ON"
sql = f"""
REVOKE IF EXISTS
{revoke} {schema_name}.{db_object_name}
FROM 'mysql_rest_service_data_provider'@'%'
"""
session.run_sql(sql)
def get_table_columns_with_references(session, schema_name, db_object_name, db_object_type):
sql = """
CALL `mysql_rest_service_metadata`.`table_columns_with_references`(?, ?)
"""
return core.MrsDbExec(sql).exec(session, [schema_name, db_object_name]).items
def get_objects(session, db_object_id):
sql = """
SELECT *
FROM `mysql_rest_service_metadata`.`object`
WHERE db_object_id = ?
ORDER BY position
"""
return core.MrsDbExec(sql).exec(session, [db_object_id]).items
def get_object_via_absolute_request_path(session, absolute_request_path, ignore_case=True):
sql = """
SELECT o.id
FROM `mysql_rest_service_metadata`.`db_object` AS o
JOIN `mysql_rest_service_metadata`.`db_schema` AS s
ON s.id = o.db_schema_id
JOIN `mysql_rest_service_metadata`.`service` AS se
ON se.id = s.service_id
"""
if ignore_case:
sql += """
WHERE LOWER(CONCAT(se.url_context_root, s.request_path, o.request_path)) = LOWER(?)
"""
else:
sql = """
WHERE CONCAT(se.url_context_root, s.request_path, o.request_path) = ?
"""
return core.MrsDbExec(sql).exec(session, [absolute_request_path]).items
def get_object_fields_with_references(session, object_id, binary_formatter=None):
sql = """
SELECT *
FROM `mysql_rest_service_metadata`.`object_fields_with_references`
WHERE object_id = ?
"""
return core.MrsDbExec(sql, binary_formatter=binary_formatter).exec(session, [object_id]).items
def crud_mapping(crud_operations):
crud_to_grant_mapping = {
'CREATE': 'INSERT',
'READ': 'SELECT',
'UPDATE': 'UPDATE',
'DELETE': 'DELETE'
}
grant_privileges = []
for crud_operation in crud_operations:
grant_privileges.append(crud_to_grant_mapping[crud_operation])
return grant_privileges
def get_sdk_service_data(session, service_id, binary_formatter=None):
sql = """
SELECT COUNT(*) > 0 AS available
FROM information_schema.routines
WHERE routine_name = 'sdk_service_data'
AND routine_type = 'PROCEDURE'
AND routine_schema = 'mysql_rest_service_metadata'
"""
if core.MrsDbExec(sql).exec(session).first["available"]:
sql = """
CALL `mysql_rest_service_metadata`.`sdk_service_data`(?)
"""
return (
core.MrsDbExec(sql, binary_formatter=binary_formatter)
.exec(session, [service_id])
.first
)
else:
return None