gui/backend/gui_plugin/core/dbms/DbSqliteSession.py (283 lines of code) (raw):
# Copyright (c) 2021, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import os.path
import sqlite3
import time
import os
import stat
import gui_plugin.core.Error as Error
import gui_plugin.core.Logger as logger
from gui_plugin.core.Context import get_context
from gui_plugin.core.dbms.DbSession import (DbSession, DbSessionFactory,
ReconnectionMode)
from gui_plugin.core.dbms.DbSessionTasks import check_supported_type
from gui_plugin.core.dbms.DbSqliteSessionTasks import (
SqliteBaseObjectTask, SqliteGetAutoCommit, SqliteOneFieldListTask,
SqliteSetCurrentSchemaTask, SqliteTableObjectTask, SqliteColumnObjectTask,
SqliteColumnsMetadataTask, SqliteColumnsListTask)
from gui_plugin.core.Error import MSGException
def find_schema_name(config):
if 'database_name' in config and config['database_name'] != '':
return config['database_name']
elif 'db_file' in config:
return os.path.splitext(os.path.basename(config['db_file']))[0]
else:
return ''
class SqliteConnection(sqlite3.Connection):
def cursor(self):
self.row_factory = sqlite3.Row
c = super(SqliteConnection, self).cursor(DbCursor)
self.row_factory = None
return c
def commit(self):
super(SqliteConnection, self).commit()
return self
def rollback(self):
super(SqliteConnection, self).rollback()
return self
class DbCursor(sqlite3.Cursor):
_last_error = None
last_execution_time = None
def set_last_error(self, error):
self._last_error = error
@property
def last_error(self):
return self._last_error
def execute(self, sql, params=None):
# reset last error and execution time
self.set_last_error(None)
self.last_execution_time = None
start_time = time.time()
# call execute of parent class
if params:
res = super(DbCursor, self).execute(sql, params)
else:
res = super(DbCursor, self).execute(sql)
end_time = time.time()
# set executing time
self.last_execution_time = end_time - start_time
return res
def fetch_one(self):
return self.fetchone()
def fetch_all(self):
return self.fetchall()
def get_last_execution_time(self):
return self.last_execution_time
def get_last_row_id(self):
# cspell:ignore lastrowid
return self.lastrowid
@DbSessionFactory.register_session('Sqlite')
class DbSqliteSession(DbSession):
_supported_types = [{"name": "Schema", "type": "CATALOG_OBJECT"},
{"name": "Table", "type": "SCHEMA_OBJECT"},
{"name": "View", "type": "SCHEMA_OBJECT"},
{"name": "Trigger", "type": "TABLE_OBJECT"},
{"name": "Primary Key", "type": "TABLE_OBJECT"},
{"name": "Index", "type": "TABLE_OBJECT"},
{"name": "Column", "type": "TABLE_OBJECT"}]
def __init__(self, id, threaded, connection_options, data={}, auto_reconnect=ReconnectionMode.NONE, task_state_cb=None,
on_connected_cb=None, on_failed_cb=None, prompt_cb=None, pwd_prompt_cb=None,
message_callback=None):
super().__init__(id, threaded, connection_options, data,
auto_reconnect=auto_reconnect, task_state_cb=task_state_cb)
self._connected_cb = on_connected_cb
self._failed_cb = on_failed_cb
self.session = None
self._databases = {}
self._add_database(self._connection_options)
self._default_schema = self._current_schema = list(
self._databases.keys())[0]
if 'attach' in self._connection_options:
for attach in self._connection_options['attach']:
self._add_database(attach)
self.open()
@property
def database_type(self):
return "SQLite"
@property
def databases(self):
return self._databases
def _add_database(self, config):
# Set the database_name if not available
db_name = find_schema_name(config)
if 'db_file' in config and isinstance(config['db_file'], str) and config['db_file'] != "":
self._databases[db_name] = config['db_file']
else:
raise MSGException(Error.DB_INVALID_OPTIONS,
"The 'db_file' option was not set for the '%s' database." % db_name)
def _do_open_database(self, notify_success=True):
try:
self._on_connect()
# open the database connection
self.conn = sqlite3.connect(self._databases[self._current_schema], timeout=5, factory=SqliteConnection,
isolation_level=None, check_same_thread=False)
# restrict permissions to the database file
os.chmod(self._databases[self._current_schema],
stat.S_IRUSR | stat.S_IWUSR)
# Cursor to be used for statements from the owner of this instance
self.cursor = None
init_cursor = self.conn.execute("PRAGMA journal_mode = WAL")
init_cursor.close()
for (database_name, db_file) in self._databases.items():
if database_name == self._current_schema:
continue
self.conn.execute(f"ATTACH '{db_file}' AS '{database_name}';")
if self._connected_cb is not None and notify_success:
self._connected_cb(self)
except Exception as e:
if self._failed_cb is None:
raise e
else:
self._failed_cb(e)
return False
return True
def _reconnect(self, is_auto_reconnect):
logger.debug3(f"Reconnecting session {self._id}...")
self._close_database(False)
self._open_database(is_auto_reconnect is False)
def _do_close_database(self, finalize):
self.conn.close()
# DbSession overrides
def do_execute(self, sql, params=None):
try:
self.lock()
# NOTE: theoretically self.conn.execute(sql, params) should work.
# Reasoning is that when the connection was created, it used SqliteConnection as factory.
# SqliteConnection overrides the cursor() function to produce instances of DbCursor instead
# of instances of sqlite3.Cursor when the execute function is called.
#
# In python 3.11.4 this stopped working, and self.conn.execute() started producing
# instances of sqlite3.Cursor, leading to misc errors in our code which expects to be
# using instances of DbCursor.
#
# A backwards compatible solution, is to explicitly create the cursor object (which guarantees =
# it is an instance of DbCursor), and then call the execute() function in it.
self.cursor = self.conn.cursor().execute(sql, params)
finally:
self.release()
return self.cursor
def _get_stats(self, resultset):
return {
"last_insert_id": resultset.lastrowid,
"rows_affected": resultset.rowcount
}
def next_result(self):
return False
def row_generator(self):
row = self.cursor.fetchone()
while row:
yield row
row = self.cursor.fetchone()
def get_column_info(self, row=None):
# Because of limitation of Sqlite cursor, we cannot get info about Sqlite column type.
# There is also no known workaround for this, so only way to get info about column type is
# to use build-in python type function and accept python type instead Sqlite type.
# However there is few limitations for this approach:
# - date, time and datetime becomes str
# - boolean cannot be distinguished from int
# - both real and numeric becomes float
# - if value is None, we have no clue about type, so we assume is str
return [{"name": description[0], "type": "str" if type(row[i]).__name__ is None else type(row[i]).__name__}
for i, description in enumerate(self.cursor.description)]
def row_to_container(self, row, columns):
return tuple(row)
def info(self):
return {}
def start_transaction(self):
self.execute('BEGIN TRANSACTION;')
def kill_query(self, user_session):
user_session._killed = True
user_session.interrupt()
def get_default_schema(self):
return self._default_schema
def get_current_schema(self, callback=None, options=None):
return self._current_schema
def set_active_schema(self, schema_name):
self.conn.close()
self._current_schema = schema_name
self._do_open_database(False)
def set_current_schema(self, schema_name, callback=None, options=None):
if options is None:
options = {}
options['__new_current_schema__'] = schema_name
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteSetCurrentSchemaTask(self, task_id, params=[
schema_name], result_callback=callback, options=options))
def get_auto_commit(self, callback=None, options=None):
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteGetAutoCommit(self, task_id,
result_callback=callback, options=options))
def set_auto_commit(self, state, callback=None, options=None):
raise MSGException(Error.CORE_FEATURE_NOT_SUPPORTED,
"This feature is not supported.")
def get_objects_types(self):
return self._supported_types
@check_supported_type
def get_catalog_object_names(self, type, filter):
if type == "Schema":
sql = """SELECT name
FROM pragma_database_list()
WHERE name like ?
ORDER BY name;"""
params = (filter,)
if self.threaded:
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteOneFieldListTask(
self, task_id=task_id, sql=sql, params=params))
else:
return self.execute(sql, params)
@check_supported_type
def get_schema_object_names(self, type, schema_name, filter, routine_type=None):
if type == "Table":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'table'
AND name like ?
ORDER BY name;"""
elif type == "View":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'view'
AND name like ?
ORDER BY name;"""
params = (filter, )
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteOneFieldListTask(self, task_id=task_id, sql=sql,
params=params))
@check_supported_type
def get_table_object_names(self, type, schema_name, table_name, filter):
params = (table_name, filter)
if type == "Trigger":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'trigger'
AND tbl_name = ?
AND name like ?
ORDER BY name;"""
elif type == "Primary Key":
sql = """SELECT t.name
FROM pragma_table_info(?) as t
WHERE t.pk > 0
AND t.name like ?
ORDER BY t.pk;"""
elif type == "Index":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'index'
AND tbl_name = ?
AND name like ?
ORDER BY name;"""
elif type == "Column":
sql = f"""SELECT
c.name as column_name
FROM `{schema_name}`.pragma_table_info(?) AS c
LEFT OUTER JOIN (
SELECT DISTINCT m.name AS 'table_name',
ii.name AS 'table_column',
il.`unique` as UNIQUE_KEY
FROM sqlite_master AS m,
pragma_index_list(m.name) AS il,
pragma_index_info(il.name) AS ii
WHERE m.type='table' and m.tbl_name=? and
il.`unique`=1
) ic
ON ic.table_name=? AND ic.table_column = c.name
WHERE c.name like ?
ORDER BY c.cid;"""
params = (table_name, table_name, table_name, filter)
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteOneFieldListTask(self, task_id=task_id, sql=sql,
params=params))
@check_supported_type
def get_catalog_object(self, type, name):
if type == "Schema":
sql = """SELECT name
FROM pragma_database_list()
WHERE name = ?"""
params = (name,)
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteBaseObjectTask(self, task_id=task_id, sql=sql,
type=type, name=name, params=params))
@check_supported_type
def get_schema_object(self, type, schema_name, name):
params = (name,)
context = get_context()
task_id = context.request_id if context else None
if type == "Table":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = "table"
AND name = ?
ORDER BY name;""",
self.add_task(SqliteTableObjectTask(self, task_id=task_id, sql=sql,
name=f"{schema_name}.{name}", params=params))
else:
if type == "View":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'view'
AND name = ?
ORDER BY name;"""
self.add_task(SqliteBaseObjectTask(self, task_id=task_id, sql=sql,
type=type, name=f"{schema_name}.{name}"))
@check_supported_type
def get_table_object(self, type, schema_name, table_name, name):
params = (table_name, name)
if type == "Trigger":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'trigger'
AND tbl_name = ?
AND name = ?
ORDER BY name;"""
elif type == "Primary Key":
sql = """SELECT t.name
FROM pragma_table_info(?) as t
WHERE t.pk > 0
AND t.name = ?
ORDER BY t.pk;"""
elif type == "Index":
sql = f"""SELECT name
FROM `{schema_name}`.sqlite_master
WHERE type = 'index'
AND tbl_name = ?
AND name = ?
ORDER BY name;"""
elif type == "Column":
# cSpell:ignore dflt
sql = f"""SELECT name, type, "notnull" as 'not_null', dflt_value as 'default',
pk as 'is_pk', pk as 'auto_increment'
FROM pragma_table_info('{table_name}', '{schema_name}')
WHERE name = ?
ORDER BY name;"""
params = (name,)
context = get_context()
task_id = context.request_id if context else None
if type == "Column":
self.add_task(SqliteColumnObjectTask(self, task_id=task_id, sql=sql,
type=type, name=f"{schema_name}.{name}",
params=params))
else:
self.add_task(SqliteBaseObjectTask(self, task_id=task_id, sql=sql,
type=type, name=f"{schema_name}.{name}",
params=params))
def get_columns_metadata(self, names):
sql_parts = []
params = []
for name in names:
sql_parts.append(f"""
SELECT name, type, "notnull" as 'not_null', dflt_value as 'default',
pk as 'is_pk', pk as 'auto_increment',
'{name['table']}' as 'table', '{name['schema']}' as 'schema'
FROM pragma_table_info('{name['table']}', '{name['schema']}')
WHERE name = ?
""")
params.extend([name['column']])
sql = " UNION ALL ".join(sql_parts)
context = get_context()
task_id = context.request_id if context else None
self.add_task(SqliteColumnsMetadataTask(
self, task_id=task_id, sql=sql, params=params))