gui/backend/gui_plugin/core/dbms/DbSessionSetupTask.py (89 lines of code) (raw):
# Copyright (c) 2022, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from gui_plugin.core.dbms import DbSessionData as DbSessionData
from gui_plugin.core.dbms.DbSessionUtils import DbPingHandler
class DbSessionSetupTask:
"""
This class provides a structured mechanism to implement MySQL session related
tasks to be executed right before or after the MySQL session is established.
"""
def __init__(self, session, progress_cb=None) -> None:
self._session = session
self._progress_cb = progress_cb
self._input_options = {}
self._output_options = {}
self._input_data = {}
self._output_data = {}
def execute(self, sql, params=None):
return self._session.execute_thread(sql, params)
@property
def session(self):
return self._session
@property
def connection_options(self):
return self._session.connection_options
@property
def input_options(self):
return self._input_options
@property
def output_options(self):
return self._output_options
def has_option(self, option):
"""
Verifies if the given option exists on the session connection options.
"""
return option in self.connection_options
def has_data(self, option):
"""
Verifies if the given option exists on the session connection options.
"""
return self.session.has_data(option)
def get_data(self, option):
"""
Gets the given data from the session.
"""
return self.session.data[option]
def extract_option(self, option, default_value=None):
"""
Extracts an option from the connection options and registers it on the
input options.
"""
value = default_value
if self.has_option(option):
value = self.connection_options.pop(option)
self._input_options[option] = value
return value
def define_option(self, option, value):
"""
Defines an option on the connection options and the output options.
"""
# Will cause the option to be backed up if existed
self.extract_option(option)
# Defines the new value for the option
self.connection_options[option] = value
self._output_options[option] = value
def define_data(self, option, value):
"""
Defines an entry on the connection data and the output data.
"""
# If the data already exists and was not added by the task,
# backs up the current value
if option in self.session.data and option not in self._output_data:
self._input_data[option] = self.session.data[option]
# Defines the new value for the data
self.session.data[option] = value
self._output_data[option] = value
def report_progress(self, msg):
"""
Progress callback to be used if the task is of long duration to keep the
clients up to date on what's going on.
"""
if self._progress_cb is not None:
self._progress_cb(msg)
def reset(self, include_data=True):
"""
Resets any change done by this task on the session connection options.
"""
# Removes any option added from this task
for option in self._output_options.keys():
if option in self.connection_options:
self.connection_options.pop(option)
# Adds any option removed by this task
for option, value in self._input_options.items():
self.connection_options[option] = value
if include_data:
# Removes any data added from this task
for option in self._output_data.keys():
if option in self.session.data:
self.session.data.pop(option)
# Adds any data removed by this task
for option, value in self._input_data.items():
self.session.data[option] = value
self._input_data.clear()
self._output_data.clear()
self._input_options.clear()
self._output_options.clear()
def on_connect(self):
"""
Override this function to implement task to be executed right before
executing the MySQL Session.
IMPORTANT: Any non official connection option should be removed here to
avoid connection errors from the Shell.
"""
pass
def on_connected(self):
"""
Override this function to implement task to be executed right after the
MySQL Session has been established
"""
pass
def on_failed_connection(self):
"""
Override this function to implement task to be executed right after the
MySQL Session failed getting established
"""
pass
def on_close(self):
"""
Override this function to implement what the task should do if te session is
closed
"""
pass
class DbPingHandlerTask(DbSessionSetupTask):
def __init__(self, session, progress_cb=None) -> None:
super().__init__(session, progress_cb)
# The check is enabled if the value is not known
self._db_pinger = None
def reset(self, include_data=True):
super().reset(include_data)
self.on_close()
def on_connected(self):
if self.session.has_data(DbSessionData.PING_INTERVAL):
interval = self.session.data[DbSessionData.PING_INTERVAL]
if interval is not None and interval > 0:
self._db_pinger = DbPingHandler(self.session, interval)
self._db_pinger.start()
def on_close(self):
if self._db_pinger is not None:
self._db_pinger.stop()
self._db_pinger.join()