gui/backend/gui_plugin/core/GuiBackendDbManager.py (343 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import contextlib import os import pathlib import re import sqlite3 import stat from datetime import date from os import chdir, getcwd, listdir, makedirs, path, remove, rename import mysqlsh import gui_plugin.core.Logger as logger from gui_plugin.core import Error from gui_plugin.core.dbms import DbSessionFactory from gui_plugin.core.lib.Version import Version # Refers to the schema version supported by this version of the code CURRENT_DB_VERSION = Version((0, 0, 20)) # Do not change it, it was dropped in 0.0.16 and that is valid value DROPPED_VERSION_IN_NAME_DB_VERSION = Version((0, 0, 16)) OLDEST_SUPPORTED_DB_VERSION = Version((0, 0, 11)) DEFAULT_CONFIG = { "log_rotation_period": 7 } class BackendDbManager(): """ Handles the maintenance tasks on the backend database, including: - Deploying - Upgrading - Log Rotation Subclasses of this class handle the specific implementation details """ def __init__(self, log_rotation=False, session_uuid=None, connection_options=None): self._session_uuid = session_uuid self._connection_options = connection_options self._config = DEFAULT_CONFIG self.ensure_database_exists() # Log rotation verification should be enabled by the caller # only at specific locations db = self.open_database() if log_rotation and self.check_if_logs_need_rotation(db): self.backup_logs(db) db.close() def ensure_database_exists(self): if not self.current_database_exist(): self.initialize_db() else: self.check_for_previous_version_and_upgrade() def check_if_logs_need_rotation(self, db): sql_message = "SELECT COUNT(sent) FROM `gui_log`.`message` WHERE date(sent) < date('now');" sql_log = "SELECT COUNT(event_time) FROM `gui_log`.`log` WHERE date(event_time) < date('now');" old_rows_count = 0 try: res = db.execute(sql_message).fetch_one() old_rows_count += int(res[0]) res = db.execute(sql_log).fetch_one() old_rows_count += int(res[0]) except Exception as e: # pragma: no cover # TODO(rennox): Is this the right way to set the last error? db.set_last_error(e) return old_rows_count > 0 def open_database(self): # pragma: no cover raise NotImplementedError() def current_database_exist(self): # pragma: no cover raise NotImplementedError() def check_for_previous_version_and_upgrade(self): # pragma: no cover raise NotImplementedError() def initialize_db(self): # pragma: no cover raise NotImplementedError() def backup_logs(self, db): # pragma: no cover raise NotImplementedError() class BackendSqliteDbManager(BackendDbManager): """ Implementation details for the backend database in Sqlite """ def __init__(self, log_rotation=False, session_uuid=None, connection_options=None): if connection_options and "db_dir" in connection_options: self.db_dir = connection_options["db_dir"] else: # use ~/.mysqlsh/plugin_data/gui_plugin/mysqlsh_gui_backend_{CURRENT_DB_VERSION}.sqlite3 self.db_dir = mysqlsh.plugin_manager.general.get_shell_user_dir( # pylint: disable=no-member 'plugin_data', 'gui_plugin') self.current_dir = getcwd() db_log_file = path.join(self.db_dir, 'mysqlsh_gui_backend_log.sqlite3') super().__init__(log_rotation=log_rotation, session_uuid=session_uuid, connection_options=connection_options if connection_options is not None else { "database_name": "main", "db_file": path.join(self.db_dir, 'mysqlsh_gui_backend.sqlite3'), "attach": [ { "database_name": "gui_log", "db_file": db_log_file }] }) def open_database(self): session_id = "BackendDB-" + \ "anonymous" if self._session_uuid is None else self._session_uuid return DbSessionFactory.create("Sqlite", session_id, False, self._connection_options, None, True, None, None, None, None, None) def current_database_exist(self): return path.isfile(self._connection_options["db_file"]) def check_for_previous_version_and_upgrade(self): logger.debug2( f"Checking for previous version and upgrade\n\tdb_dir: {self.db_dir}") makedirs(self.db_dir, exist_ok=True) final_db_file = "mysqlsh_gui_backend.sqlite3" final_db_log_file = "mysqlsh_gui_backend_log.sqlite3" installed_db_log_file = None installed_db_file = self.find_installed_db_file(self.db_dir) logger.debug2(f"Found installed database file: {installed_db_file}") installed_version = Version() if installed_db_file is not None: installed_version = self.get_db_version( path.join(self.db_dir, installed_db_file)) logger.debug2(f"Installed database version: {installed_version}") # if no earlier version is found, return with False if installed_version == (0, 0, 0): return False if installed_version == CURRENT_DB_VERSION: return True if installed_version > CURRENT_DB_VERSION: raise Exception(f'Cannot downgrade database from ' f'schema {installed_version} to {CURRENT_DB_VERSION}') script_dir = path.join(path.dirname(__file__), 'db_schema') upgrade_steps = self.get_upgrade_steps(script_dir) previous_version = Version() for version in upgrade_steps: if version[1] == installed_version: previous_version = version[0] break upgrade_scripts = self.find_upgrade_scripts( script_dir, installed_version, CURRENT_DB_VERSION) try: # Because we don't know the full path in the script, # we have to change working directory so that new 'mysqlsh_gui_backend_log` # is created in the appropriate location. chdir(self.db_dir) logger.info( f"Renaming {installed_db_file} to {installed_db_file}.backup") if path.exists(f'{installed_db_file}.backup'): self.rename_db_file( f'{installed_db_file}.backup', f'{installed_db_file}.backup.old') self.backup_db(installed_db_file, f'{installed_db_file}.backup') if final_db_file != installed_db_file: logger.info( f"Copying {installed_db_file}.backup to {final_db_file}") self.rename_db_file(f'{installed_db_file}', final_db_file) if installed_version < DROPPED_VERSION_IN_NAME_DB_VERSION: installed_db_log_file = f'mysqlsh_gui_backend_log_{installed_version}.sqlite3' else: installed_db_log_file = final_db_log_file logger.info( f"Found installed log database file: {installed_db_log_file}") logger.info( f"Renaming {installed_db_log_file} to {installed_db_log_file}.backup") if path.exists(f'{installed_db_log_file}.backup'): self.rename_db_file( f'{installed_db_log_file}.backup', f'{installed_db_log_file}.backup.old') self.backup_db(installed_db_log_file, f'{installed_db_log_file}.backup') if final_db_log_file != installed_db_log_file: logger.info( f"Copying {installed_db_log_file}.backup to {final_db_log_file}") self.rename_db_file(f'{installed_db_log_file}', final_db_log_file) try: logger.debug2("Start upgrading database") conn = sqlite3.connect(final_db_file) conn.execute("VACUUM") cursor = conn.cursor() for script in upgrade_scripts: with open(path.join(script_dir, script), 'r', encoding='utf-8') as sql_file: sql = sql_file.read() logger.info(f"Executing upgrade script: {script}") logger.debug3(f"SQL: {sql}") cursor.executescript(sql) conn.commit() conn.close() logger.info("Database successfully upgraded") except Exception as e: logger.error( "Error occurred during database upgrade, rolling back database") logger.exception(e) conn.rollback() conn.close() # move the files back self.remove_db_file(final_db_file) logger.info( f"Renaming file: {installed_db_file}.backup to {installed_db_file}") rename(f'{installed_db_file}.backup', installed_db_file) if path.exists(f'{installed_db_log_file}.backup.old'): self.rename_db_file( f'{installed_db_log_file}.backup.old', f'{installed_db_log_file}.backup') # move the log files back self.remove_db_file(final_db_log_file) logger.info( f"Renaming file: {installed_db_log_file}.backup to {installed_db_log_file}") rename(f'{installed_db_log_file}.backup', installed_db_log_file) if path.exists(f'{installed_db_log_file}.backup.old'): self.rename_db_file( f'{installed_db_log_file}.backup.old', f'{installed_db_log_file}.backup') raise e if installed_version < DROPPED_VERSION_IN_NAME_DB_VERSION: self.remove_db_file(installed_db_log_file) self.remove_db_file(installed_db_file) # Removing previous backup and old log files if previous_version < DROPPED_VERSION_IN_NAME_DB_VERSION: self.cleanup(previous_version) with contextlib.suppress(FileNotFoundError): remove(f'{installed_db_log_file}.backup.old') with contextlib.suppress(FileNotFoundError): remove(f'{installed_db_file}.backup.old') finally: chdir(self.current_dir) return True def initialize_db(self): sql_file_path = path.join(path.dirname(__file__), 'db_schema', 'mysqlsh_gui_backend.sqlite.sql') logger.debug2( f"Starting initializing database:\n\tdatabase file:{self._connection_options['db_file']}\n\tsql_file:{sql_file_path}") try: makedirs(self.db_dir, exist_ok=True) db_file = self._connection_options["db_file"] conn = sqlite3.connect(db_file) os.chmod(db_file, stat.S_IRUSR | stat.S_IWUSR) cursor = conn.cursor() # Do a fresh initialization of the database with open(sql_file_path, 'r', encoding='UTF-8') as sql_file: sql = sql_file.read() # Because we don't know the full path in the script, # we have to change working directory so that new 'mysqlsh_gui_backend_log` # is created in the appropriate location. chdir(self.db_dir) cursor.executescript(sql) conn.commit() version = self.get_db_version(self._connection_options['db_file']) logger.debug2( f"Database successfully initialized\n\tDatabase version: {version}") except Exception as e: # pragma: no cover conn.rollback() logger.error(f'Cannot initialize database. {e}') raise e from None finally: conn.close() chdir(self.current_dir) def backup_logs(self, db): # create new backup file # attach it to db as backup new_filename = pathlib.Path(self.db_dir, f"mysqlsh_gui_backend_log_{date.today().strftime('%Y.%m.%d')}.sqlite3") # The backup was done for today already if new_filename.exists(): return # check files, remove oldest if count > 6 backup_files = [] for f in listdir(self.db_dir): m = re.match( r'mysqlsh_gui_backend_log_\d+\.\d+\.\d+\.sqlite3', f) if m: backup_files.append(f) while len(backup_files) > self._config['log_rotation_period']: file_to_remove = sorted(backup_files)[0] remove(path.join(self.db_dir, file_to_remove)) backup_files.remove(file_to_remove) try: db.execute( f"ATTACH DATABASE '{new_filename}' as 'backup';") db.start_transaction() # create tables and copy data # from log and message to backup db copy_sql = """CREATE TABLE `backup`.`log` AS SELECT * FROM `gui_log`.`log` WHERE date(`event_time`) < date('now')""" db.execute(copy_sql) copy_sql = """CREATE TABLE `backup`.`message` AS SELECT * FROM `gui_log`.`message` WHERE date(`sent`) < date('now')""" db.execute(copy_sql) # delete old logs and messages delete_sql = """DELETE FROM `gui_log`.`log` WHERE date(`event_time`) < date('now')""" db.execute(delete_sql) delete_sql = """DELETE FROM `gui_log`.`message` WHERE date(`sent`) < date('now')""" db.execute(delete_sql) db.commit() # detach backup db. can not detach inside a transaction. db.execute(f"DETACH DATABASE 'backup';") except Exception as e: # pragma: no cover logger.error(f"Exception caught during log backup: {e}") # TODO(rennox): Is this the right way to set the last error? db.set_last_error(e) db.rollback() def remove_db_file(self, path): self.remove_wal_and_shm_files(path) with contextlib.suppress(FileNotFoundError): remove(path) def rename_db_file(self, src, dst): rename(src, dst) self.remove_wal_and_shm_files(src) def cleanup(self, file_version): with contextlib.suppress(FileNotFoundError): remove(path.join(self.db_dir, f'mysqlsh_gui_backend_{file_version}.sqlite3.backup')) with contextlib.suppress(FileNotFoundError): remove(path.join( self.db_dir, f'mysqlsh_gui_backend_log_{file_version}.sqlite3.backup')) files_to_remove = [] for f in listdir(self.db_dir): m = re.match( r'\d+\.\d+\.\d+_mysqlsh_gui_backend_log_(\d+)\.(\d+)\.(\d+)\.sqlite3', f) if m: g = m.groups() version = Version(g[0]) if file_version == version: files_to_remove.append(f) for file in files_to_remove: remove(file) def get_db_version(self, db_path): version = Version() with sqlite3.connect(db_path) as conn: cursor = conn.cursor() try: row = cursor.execute( "SELECT major, minor, patch FROM schema_version;").fetchone() version = Version(row) except Exception as e: logger.exception(e) return version def find_installed_db_file(self, db_dir): installed_version = Version() # find the latest version of the database file available db_file = "mysqlsh_gui_backend.sqlite3" found = False if not path.exists(path.join(db_dir, db_file)): db_file = None for f in listdir(db_dir): m = re.match( r'mysqlsh_gui_backend_(\d+)\.(\d+)\.(\d+)\.sqlite3', f) if m: g = Version(m.groups()) if g > installed_version or installed_version == (0, 0, 0): found = True installed_version = g if found: if installed_version < OLDEST_SUPPORTED_DB_VERSION: raise Error.MSGException( Error.DB_UNSUPPORTED_FILE_VERSION, "Database file to upgrade have to be at least in 0.0.11 version.") db_file = f"mysqlsh_gui_backend_{installed_version}.sqlite3" return db_file def find_upgrade_scripts(self, script_dir, from_version, to_version): upgrade_scripts = [] version_to_upgrade = from_version all_scripts = self.get_upgrade_steps(script_dir) for version in all_scripts: if version[0] == version_to_upgrade: upgrade_scripts.append( f"mysqlsh_gui_backend_{version[0]}_to_{version[1]}.sqlite.sql") if version[1] == to_version: upgrade_file_found = True break else: version_to_upgrade = version[1] if version_to_upgrade != to_version and not upgrade_file_found: raise Exception(f'No upgrade file found to go from database ' f'schema {version_to_upgrade} to {to_version}') return upgrade_scripts def backup_db(self, src, dst): with sqlite3.connect(src) as source, sqlite3.connect(dst) as dest: os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR) source.backup(dest) def remove_wal_and_shm_files(self, file): with contextlib.suppress(FileNotFoundError): remove(f'{file}-shm') with contextlib.suppress(FileNotFoundError): remove(f'{file}-wal') def get_upgrade_steps(self, script_dir): upgrade_scripts = [] for f in listdir(script_dir): m = re.match( r'mysqlsh_gui_backend_(\d+\.\d+\.\d+)_to_(\d+\.\d+\.\d+)\.sqlite.sql', f) if not m: continue g = m.groups() update_from_version = Version(g[0]) upgrade_to_version = Version(g[1]) upgrade_scripts.append((update_from_version, upgrade_to_version)) return sorted(upgrade_scripts)