gui/backend/gui_plugin/sql_editor/SqlEditor.py (145 lines of code) (raw):
# Copyright (c) 2020, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import json
from mysqlsh.plugin_manager import \
plugin_function # pylint: disable=import-error
from gui_plugin.core import Error
from gui_plugin.core.Context import get_context
from gui_plugin.core.Db import BackendDatabase, BackendTransaction
from gui_plugin.core.Error import MSGException
from gui_plugin.db import backend as db_backend
from gui_plugin.modules.Modules import (add_data, delete_data,
get_data_category_id, list_data)
from gui_plugin.sql_editor.SqlEditorModuleSession import SqlEditorModuleSession
from . import backend as sql_editor_backend
@plugin_function('gui.sqlEditor.isGuiModuleBackend', web=True)
def is_gui_module_backend():
"""Indicates whether this module is a GUI backend module
Returns:
bool: True
"""
return True
@plugin_function('gui.sqlEditor.getGuiModuleDisplayInfo', web=True)
def get_gui_module_display_info():
"""Returns display information about the module
Returns:
dict: display information for the module
"""
return {"name": "SQL Editor",
"description": "A graphical SQL Editor",
"icon_path": "/images/icons/modules/gui.sqlEditor.svg"}
@plugin_function('gui.sqlEditor.startSession', shell=False, web=True)
def start_session():
"""Starts a SQL Editor Session
Returns:
dict: contains module session ID
"""
new_session = SqlEditorModuleSession()
return {"module_session_id": new_session.module_session_id}
@plugin_function('gui.sqlEditor.closeSession', shell=False, web=True)
def close_session(module_session):
"""Closes the SQL Editor Session
Args:
module_session (object): The module session object that should be closed
Returns:
None
"""
module_session.close()
@plugin_function('gui.sqlEditor.openConnection', shell=False, web=True)
def open_connection(db_connection_id, module_session, password=None):
"""Opens the SQL Editor Session
Args:
db_connection_id (int): The id of the db_connection
module_session (object): The session where the connection will open
password (str): The password to use when opening the connection. If not supplied, then use the password defined in the database options.
Returns:
None
"""
module_session.open_connection(db_connection_id, password)
@plugin_function('gui.sqlEditor.reconnect', shell=False, web=True)
def reconnect(module_session):
"""Reconnects the SQL Editor Session
Args:
module_session (object): The session where the session will be reconnected
Returns:
None
"""
module_session.reconnect()
@plugin_function('gui.sqlEditor.startTransaction', shell=False, web=True)
def start_transaction(session):
"""Starts a new transaction
Args:
session (object): The session used to execute the operation
Returns:
None
"""
session = db_backend.get_db_session(session)
session.start_transaction()
@plugin_function('gui.sqlEditor.commitTransaction', shell=False, web=True)
def commit_transaction(session):
"""Starts a new transaction
Args:
session (object): The session used to execute the operation
Returns:
None
"""
session = db_backend.get_db_session(session)
session.commit()
@plugin_function('gui.sqlEditor.rollbackTransaction', shell=False, web=True)
def rollback_transaction(session):
"""Starts a new transaction
Args:
session (object): The session used to execute the operation
Returns:
None
"""
session = db_backend.get_db_session(session)
session.rollback()
@plugin_function('gui.sqlEditor.execute', shell=True, web=True)
def execute(session, sql, params=None, options=None):
"""Executes the given SQL.
Args:
session (object): The session used to execute the operation
sql (str): The sql command to execute.
params (list): The parameters for the sql command.
options (dict): A dictionary that holds additional options, e.g.
{"row_packet_size": -1}
Allowed options for options:
row_packet_size (int): The pack size for each result segment
Returns:
dict: the result message
"""
session = db_backend.get_db_session(session)
return session.execute(sql=sql, params=params, options=options)
@plugin_function('gui.sqlEditor.killQuery', shell=False, web=True)
def kill_query(module_session):
"""Stops the query that is currently executing.
Args:
module_session (object): The module session object where the query is running
Returns:
None
"""
module_session.kill_query()
@plugin_function('gui.sqlEditor.getCurrentSchema', shell=True, web=True)
def get_current_schema(session):
"""Requests the current schema for this module.
Args:
session (object): The session used to execute the operation
Returns:
str: current schema name
"""
session = db_backend.get_db_session(session)
return session.get_current_schema()
@plugin_function('gui.sqlEditor.setCurrentSchema', shell=True, web=True)
def set_current_schema(session, schema_name):
"""Requests to change the current schema for this module.
Args:
session (object): The session used to execute the operation
schema_name (str): The name of the schema to use
Returns:
None
"""
session = db_backend.get_db_session(session)
session.set_current_schema(schema_name=schema_name)
@plugin_function('gui.sqlEditor.getAutoCommit', shell=True, web=True)
def get_auto_commit(session):
"""Requests the auto-commit status for this module.
Args:
session (object): The session used to execute the operation
Returns:
int: auto-commit status
"""
session = db_backend.get_db_session(session)
return session.get_auto_commit()
@plugin_function('gui.sqlEditor.setAutoCommit', shell=True, web=True)
def set_auto_commit(session, state):
"""Requests to change the auto-commit status for this module.
Args:
session (object): The session used to execute the operation
state (bool): The auto-commit state to set for the module session
Returns:
None
"""
session = db_backend.get_db_session(session)
session.set_auto_commit(state=state)
@plugin_function('gui.sqlEditor.addExecutionHistoryEntry', shell=False, web=True)
def add_execution_history_entry(connection_id, code, language_id, profile_id=None, be_session=None):
"""Adds a new entry in the execution_history
Adds an entry of the code + language_id + current_timestamp at the beginning of
the execution_history of the given connection_id. Each connection_id has its own execution_history.
execution_history has a maximum size that can be configured, defaults to 50 entries.
Args:
connection_id (int): The id of the db_connection
code (str): The code to be stored in the history
language_id (str): The language id of the code
profile_id (int): The id of profile
be_session (object): A session to the GUI backend database
where the operation will be performed.
Returns:
int: the id of the new record.
"""
max_entries = 50
category_id = get_data_category_id("DB Notebook Code History")
folder_name = f"db_notebook_code_history_{connection_id}"
tree_identifier = "DBNotebookCodeHistoryTree"
with BackendDatabase(be_session) as db:
context = get_context()
group_id = context.web_handler.user_personal_group_id if context is not None else None
folder_id = sql_editor_backend.get_folder_id(db, connection_id, group_id)
history_entries = list_data(folder_id, category_id, be_session)
history_entries.sort(key=lambda x: x['last_update'], reverse=True)
# Check if the last entry has the same code
if history_entries and len(history_entries) > 0:
previous_entry_code = sql_editor_backend.get_entry(db, history_entries[0]['id'])["code"]
if previous_entry_code == code:
return history_entries[0]['id']
entry_id = add_data(language_id, code, category_id, tree_identifier, folder_name, profile_id, be_session)
if len(history_entries) >= max_entries:
delete_data(history_entries[0]['id'], folder_id, be_session)
return entry_id
@plugin_function('gui.sqlEditor.getExecutionHistoryEntry', shell=False, web=True)
def get_execution_history_entry(connection_id, index, profile_id=None, be_session=None):
"""Returns an entry of the execution_history
The execution_history stored the entries with newest entry on top. So passing an index of 0
will return the last inserted history item. Passing an index higher than the current number
of stored items will return an empty dict.
Args:
connection_id (int): The id of the db_connection
index (int): The index of the history entry to return
profile_id (int): The id of profile
be_session (object): A session to the GUI backend database
where the operation will be performed.
Returns:
dict: index, code, language_id, current_timestamp of the entry
"""
with BackendDatabase(be_session) as db:
with BackendTransaction(db):
context = get_context()
group_id = context.web_handler.user_personal_group_id if context is not None else None
category_id = get_data_category_id("DB Notebook Code History", be_session)
folder_id = sql_editor_backend.get_folder_id(db, connection_id, group_id, profile_id)
history_entries = list_data(folder_id, category_id, be_session)
history_entries.sort(key=lambda x: x['last_update'], reverse=True)
if index < len(history_entries):
return sql_editor_backend.get_entry(db, history_entries[index]['id'])
else:
return {}
@plugin_function('gui.sqlEditor.getExecutionHistoryEntries', shell=False, web=True)
def get_execution_history_entries(connection_id, language_id="", truncate_code_length=-1, profile_id=None, be_session=None):
"""Returns the full list execution_history but truncates the code to truncate_code_length
Used to display a list of history entries in the UI. If truncate_code_length is set to -1,
the full code is returned.
If the language_id is specified, the list is filtered to only the specific language
Args:
connection_id (int): The id of the db_connection
language_id (str): The language id of the code
truncate_code_length (int): The length to truncate the code to
profile_id (int): The id of profile
be_session (object): A session to the GUI backend database
where the operation will be performed.
Returns:
list[dict]: code, language_id, current_timestamp of the entry
"""
with BackendDatabase(be_session) as db:
with BackendTransaction(db):
list_of_history_entries = []
context = get_context()
group_id = context.web_handler.user_personal_group_id if context is not None else None
category_id = get_data_category_id("DB Notebook Code History", be_session)
folder_id = sql_editor_backend.get_folder_id(db, connection_id, group_id, profile_id)
history_entries = list_data(folder_id, category_id, be_session)
if language_id != "":
history_entries = [entry for entry in history_entries if entry['caption'] == language_id]
history_entries.sort(key=lambda x: x['last_update'], reverse=True)
for entry in history_entries:
result = db.select("""SELECT content FROM data WHERE id = ?""", (entry['id'],))
try:
content = json.loads(result[0]['content'])
if truncate_code_length != -1:
content = content[:truncate_code_length]
except Exception as e:
raise MSGException(Error.CORE_INVALID_DATA_FORMAT,
f'Error decoding data content: {str(e)}') from e
list_of_history_entries.append({"code": content,
"language_id": entry['caption'],
"current_timestamp": entry['last_update']})
return list_of_history_entries
@plugin_function('gui.sqlEditor.removeExecutionHistoryEntry', shell=False, web=True)
def remove_execution_history_entry(connection_id, index=-1, profile_id=None, be_session=None):
"""Removes the execution_history entry with the given index
If -1 is passed as index, the whole execution_history list for this connection_id is cleared.
Args:
connection_id (int): The id of the db_connection
index (int): The index of the history entry to return
profile_id (int): The id of profile
be_session (object): A session to the GUI backend database
where the operation will be performed.
Returns:
None
"""
with BackendDatabase(be_session) as db:
context = get_context()
group_id = context.web_handler.user_personal_group_id if context is not None else None
category_id = get_data_category_id("DB Notebook Code History", be_session)
folder_id = sql_editor_backend.get_folder_id(db, connection_id, group_id, profile_id)
history_entries = list_data(folder_id, category_id, be_session)
history_entries.sort(key=lambda x: x['last_update'], reverse=True)
if index >= len(history_entries):
raise MSGException(Error.CORE_INVALID_PARAMETER,
"Parameter 'index' is out of range.")
if index == -1:
for entry in history_entries:
delete_data(entry['id'], folder_id, be_session)
else:
delete_data(history_entries[index]['id'], folder_id, be_session)