gui/backend/gui_plugin/core/Db.py (290 lines of code) (raw):
# Copyright (c) 2020, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from mysqlsh.plugin_manager import plugin_function # pylint: disable=no-name-in-module
import gui_plugin.core.Logger as logger
import re
import json
import datetime
import uuid
from os import path, listdir
from pathlib import Path
from .Protocols import Response
from .GuiBackendDbManager import BackendSqliteDbManager
from gui_plugin.core import Error
class BackendDatabase():
def __init__(self, be_session=None, log_rotation=False):
if be_session is not None and not isinstance(be_session, GuiBackendDb):
raise Exception("Parameter be_session must be an instance of GuiBackendDb")
self.db = be_session
self.needs_close = self.db is None
self.log_rotation = log_rotation
def __enter__(self):
if self.db is None:
self.db = GuiBackendDb(
log_rotation=self.log_rotation,
session_uuid=str(uuid.uuid1()))
return self.db
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
logger.exception(exc_value)
if self.needs_close:
self.db.close()
class BackendTransaction():
def __init__(self, db):
self._db = db
def __enter__(self):
self._db.start_transaction()
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
self._db.rollback()
else:
self._db.commit()
class GuiBackendDb():
"""
Interface to handle CRUD operations on the Backend Database
"""
def __init__(self, log_rotation=False, session_uuid=None):
# Creates the database manager which will do the automatic maintenance tasks:
# - Database Initialization
# - Log Rotation: should be enabled only on specific instances of the backend database
# such as the one created on the start.py file or when a web session
# is established
# TODO(rennox): Identify logic to determine whether this should be an SqliteDbManager
# or a MySQLDbManager
self._session_uuid = session_uuid
backend_db_manager = BackendSqliteDbManager(
log_rotation=log_rotation,
session_uuid=self._session_uuid)
# Opens the session to the backend database
self._db = backend_db_manager.open_database()
def execute(self, sql, params=None):
return self._db.execute(sql, params)
def get_last_row_id(self):
return self._db.get_last_row_id()
def start_transaction(self):
self._db.start_transaction()
def commit(self):
self._db.commit()
def rollback(self):
self._db.rollback()
def close(self):
self._db.close()
def commit_and_close(self):
self._db.commit()
self._db.close()
def rollback_and_close(self):
self.rollback()
self.close()
def insert(self, sql, params=None, commit=None, close=None):
last_id = None
status = None
try:
self.execute(sql, params)
last_id = self.get_last_row_id()
except Exception as e: # pragma: no cover
# TODO(rennox): Is this the right way to set the last error?
self._db.set_last_error(str(e))
status = self._db.get_last_status()
if commit:
self.commit()
if close:
self.close()
return Response.standard(status['type'], status['msg'], {"id": last_id})
def select(self, sql, params=None, close=None):
res = None
rows = []
try:
resultset = None
if params:
resultset = self.execute(sql, params)
else:
resultset = self.execute(sql)
res = resultset.fetch_all()
for row in res:
# check if one of the fields could be JSON, and if so, convert
# it to a dict if possible
row_dict = dict(row)
for key, val in row_dict.items():
if isinstance(val, str):
if val.startswith('{') and val.endswith('}'):
try:
row_dict[key] = json.loads(val)
except ValueError as e: # pragma: no cover
pass
rows.append(row_dict)
except Exception as e: # pragma: no cover
# TODO(rennox): Is this the right way to set the last error?
self._db.set_last_error(e)
raise
finally:
if close:
self.close()
return rows
def get_last_status(self):
return self._db.get_last_status()
def get_connection_details(self, db_connection_id):
"""
Returns a tuple with the type and options for the given connection_id
"""
res = self.execute('''SELECT db_type, options, settings
FROM db_connection
WHERE id = ?''',
(db_connection_id,)).fetch_one()
if not res:
raise Error.MSGException(Error.DB_INVALID_CONNECTION_ID,
f'There is no db_connection with the id {db_connection_id}.')
db_type = None
options = None
settings = None
try:
db_type = res['db_type']
options = json.loads(res['options'])
if "settings" in res:
settings = json.loads(res['settings'])
else:
settings = {}
except ValueError as e:
raise Error.MSGException(Error.DB_INVALID_OPTIONS,
f'The connection options are not valid JSON. {e}.')
return (db_type, options, settings)
@property
def rows_affected(self):
return self._db.rows_affected
def log(self, event_type, message):
# insert this message into the log table
self._db.execute('''INSERT INTO `gui_log`.`log`(event_time, event_type, message) VALUES(?, ?, ?)''',
(datetime.datetime.now(), event_type, message))
def message(self, session_id, is_response, message, request_id):
self._db.execute('''INSERT INTO `gui_log`.`message`(session_id, request_id, is_response,
message, sent) VALUES(?, ?, ?, ?, ?)''',
(session_id, request_id, is_response, message, datetime.datetime.now()))
def convert_workbench_sql_to_sqlite(sql):
sql = re.sub(r'(;\n;)', ';', sql)
sql = re.sub(r'(\n\s*ENGINE\s+=\s+InnoDB\s*)', '', sql)
sql = re.sub(r'(\sVISIBLE)', '', sql)
sql = re.sub(r'(\sINT\s)', ' INTEGER ', sql)
sql = re.sub(r'(START\s)', 'BEGIN ', sql)
sql = re.sub(r'(\sAFTER\s`\w*`)', '', sql)
sql = re.sub(r'(`default_schema`\.)', '', sql)
sql = re.sub(r'(DEFAULT\s+CHARACTER\s+SET\s*=\s*utf8)', '', sql)
sql = re.sub(r'(\s+DEFAULT\s+NULL)', '', sql)
sql = re.sub(r'(^SET\s\@OLD_FOREIGN_KEY_CHECKS.*\=0;$)',
'PRAGMA foreign_keys = OFF;', sql, flags=re.MULTILINE)
sql = re.sub(r'(^SET\sFOREIGN_KEY_CHECKS.*$)',
'PRAGMA foreign_keys = ON;', sql, flags=re.MULTILINE)
sql = re.sub(r'(^SET.*$)', '', sql, flags=re.MULTILINE)
sql = re.sub(r'(-- ATTACH DATABASE )', 'ATTACH DATABASE ', sql)
return sql
@plugin_function('gui.core.convertWorkbenchSqlFileToSqlite')
def convert_workbench_sql_file_to_sqlite(mysql_sql_file_path):
"""Converts a MySQL SQL file to Sqlite syntax.
Args:
mysql_sql_file_path (str): The MySQL SQL file
Returns:
Nothing
"""
try:
with open(mysql_sql_file_path, 'r') as mysql_sql_file:
sql = mysql_sql_file.read()
sql = convert_workbench_sql_to_sqlite(sql)
# Break apart multi ADD statements for ALTER TABLE
# NOTE: one must never remove something from a table, only add
prev_version_sql = ''
alter_tables = re.findall(
r'(ALTER\s+TABLE\s*(`[\w\d]*`)\s+((.|\n)*?);)', sql,
flags=re.MULTILINE)
for g in alter_tables:
old_alter_table_stmt = g[0]
new_alter_table_stmt = g[0]
alter_table_stmt = ''
table_name = g[1]
# Add a , so the last action can be matched till , as well
actions = f'{g[2]},'
# Deal with ADD COLUMN actions
actions_m = re.findall(
r'(ADD\s+COLUMN\s+([`\w_\d\s\(\)]*)),',
# r'(ADD\s+COLUMN\s+(([`\w_\d]+)[`\w_\d\s\(\)]*)),',
actions, flags=re.MULTILINE)
if actions_m:
for g in actions_m:
alter_table_stmt = f'{alter_table_stmt}' \
f'ALTER TABLE {table_name}\n' \
f' ADD COLUMN {g[1]};\n\n'
new_alter_table_stmt = re.sub(
re.escape(g[0]) + r'[,]*', '',
new_alter_table_stmt, flags=re.MULTILINE)
# Deal with ADD INDEX actions
actions_m = re.findall(
r'(ADD\s+INDEX\s+([`\w_\d]*)\s*([\(\)`\w_\d\s]+)+\s*),',
actions, flags=re.MULTILINE)
if actions_m:
for g in actions_m:
alter_table_stmt = f'{alter_table_stmt}' \
f'CREATE INDEX {g[1]} ON {table_name} {g[2]};\n\n'
new_alter_table_stmt = re.sub(
re.escape(g[0]) + r'[,]*', '',
new_alter_table_stmt, flags=re.MULTILINE)
# If there are ADD CONSTRAINT actions, keep the cleaned up
# ALTER TABLE for them
add_constraints = re.findall(
r'(ADD\s+(CONSTRAINT\s+(.|\n)*?),)',
actions, flags=re.MULTILINE)
if add_constraints:
# As soon as there is a single ADD CONSTRAINT that Sqlite cannot
# handle, the existing table has to be replaced by the new one
# Get the original CREATE TABLE statement
if prev_version_sql == '':
# If the previous SQL has not been fetched already, fetch
# it now
# file_dir = path.dirname(mysql_sql_file_path)
file_name = path.basename(mysql_sql_file_path)
filename_match = re.match(
r'(.*?)(\d+\.\d+\.\d+)_to_(\d+\.\d+\.\d+)((.|\.)*)',
file_name)
if not filename_match:
# cspell:ignore myschema
raise Exception(f'The file contains an ALTER TABLE '
f'statement but the filename {file_name} does '
f'not include the previous and the next version '
f'number, e.g. myschema_1.0.3_to_1.0.4.mysql.sql')
g = filename_match.groups()
previous_version_file_path = path.join(
path.dirname(mysql_sql_file_path),
g[0] + g[1] + g[3])
if not Path(previous_version_file_path).is_file():
raise Exception(f'The file contains an ALTER TABLE '
f'statement but the sql file for the previous '
f'version {previous_version_file_path} cannot '
f'be found.')
with open(previous_version_file_path, 'r') as prev_sql_file:
prev_version_sql = prev_sql_file.read()
original_create_table_m = re.findall(
r'(CREATE\s+TABLE.*' + re.escape(table_name) +
r'\s*\((.|\n)*?;)',
prev_version_sql, flags=re.MULTILINE)
if not original_create_table_m:
raise Exception(f'CREATE TABLE {table_name} statement not '
f'found in previous sql file '
f'{previous_version_file_path}.')
original_create_table = convert_workbench_sql_to_sqlite(
original_create_table_m[0][0])
tn = table_name[1:-1]
# rename table to {tablename}_tmp
tmp_create_table = re.sub(re.escape(f'`{tn}`'),
f'`{tn}__tmp__`', original_create_table,
flags=re.MULTILINE)
# Get all column names of previous table
columns = re.findall(r'^\s*(`[\w_\d]*`)',
original_create_table, flags=re.MULTILINE)
column_list = ', '.join(columns)
# add script for table upgrade, like outlined here:
# https://www.sqlite.org/lang_altertable.html
replacement_script = f'-- Add constraints to table {tn}\n' \
f'{tmp_create_table};\n' \
f'INSERT INTO `{tn}__tmp__`({column_list})' \
f' SELECT {column_list} FROM `{tn}`;\n' \
f'DROP TABLE `{tn}`;\n' \
f'ALTER TABLE `{tn}__tmp__` RENAME TO `{tn}`' \
f';\n\n'
sql = re.sub(re.escape(old_alter_table_stmt),
replacement_script, sql, flags=re.MULTILINE)
if alter_table_stmt != '':
sql = re.sub(re.escape(old_alter_table_stmt), alter_table_stmt,
sql, flags=re.MULTILINE)
# Break out CREATE INDEX statement from CREATE TABLE
# CREATE TABLE IF NOT EXISTS `user_group_has_user` (
# ...
# INDEX `fk_user_group_has_user_user1` (`user_id` ASC),
# ...
create_tables = re.findall(
r'(CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s*(`\w*`'
r')\s*\(\s*((.|\n)*?);)',
sql, flags=re.MULTILINE)
for g in create_tables:
old_create_table_stmt = g[0]
new_create_table_stmt = g[0]
alter_index_stmt = ''
table_name = g[1]
actions = g[2]
# Deal with ADD INDEX actions
# INDEX `fk_user_group_has_user_user1` (`user_id` ASC),
actions_m = re.findall(
r'((UNIQUE\s+)*INDEX\s+([`\w_\d]*)\s+\s*(\('
r'[`\w_\d\s,]+\))\s*,\s*)',
actions, flags=re.MULTILINE)
if actions_m:
for g in actions_m:
index_line = g[0]
# If the table ends with __tmp__, remove that ending
# since it was only added for the ALTER TABLE workaround
if table_name.endswith('__tmp__'):
table_name = table_name[0:-7]
alter_index_stmt = f'{alter_index_stmt}' \
f'CREATE {g[1]}INDEX {g[2]} ON {table_name} {g[3]};\n\n'
# remove
new_create_table_stmt = re.sub(re.escape(index_line), '',
new_create_table_stmt, flags=re.MULTILINE)
if alter_index_stmt != '':
sql = re.sub(re.escape(old_create_table_stmt),
f'{new_create_table_stmt}\n\n{alter_index_stmt}\n',
sql, flags=re.MULTILINE)
sqlite_sql_file_path = re.sub(r'(\.mysql\.)', '.sqlite.',
mysql_sql_file_path)
with open(sqlite_sql_file_path, 'w') as sqlite_sql_file:
sqlite_sql_file.write(sql)
except Exception as e: # pragma: no cover
logger.error(f'Cannot convert file. {e}')
@plugin_function('gui.core.convertAllWorkbenchSqlFilesToSqlite')
def convert_all_workbench_sql_files_to_sqlite(directory=None):
"""Converts all MySQL SQL file of the gui module to Sqlite.
Args:
directory (str): The directory path
Returns:
Nothing
"""
if not directory:
directory = path.join(path.dirname(
path.abspath(__file__)), 'db_schema')
# loop over all *.mysql.sql files in ./db_schema
for f_name in listdir(directory):
if path.isfile(path.join(directory, f_name)) and \
f_name.endswith('.mysql.sql'):
logger.debug(f'Converting {f_name}...')
convert_workbench_sql_file_to_sqlite(path.join(directory, f_name))