gui/backend/gui_plugin/core/ShellGuiWebSocketHandler.py (734 lines of code) (raw):
# Copyright (c) 2020, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import base64
import copy
import datetime
import hashlib
import inspect
import json
import re
import threading
import uuid
from contextlib import contextmanager
from queue import Empty, Queue
import mysqlsh
import gui_plugin as gui
import gui_plugin.core.Logger as logger
import gui_plugin.core.WebSocketCommon as WebSocket
from gui_plugin.core.BackendDbLogger import BackendDbLogger
from gui_plugin.core.Db import GuiBackendDb
from gui_plugin.core.dbms.DbMySQLSession import DbSession
from gui_plugin.core.HTTPWebSocketsHandler import HTTPWebSocketsHandler
from gui_plugin.core.modules.DbModuleSession import DbModuleSession
from gui_plugin.core.modules.ModuleSession import ModuleSession
from gui_plugin.core.Protocols import Response
from gui_plugin.core.RequestHandler import RequestHandler
from gui_plugin.sql_editor.SqlEditorModuleSession import SqlEditorModuleSession
from gui_plugin.users import backend as user_handler
from gui_plugin.users.backend import get_id_personal_user_group
class ShellGuiWebSocketHandler(HTTPWebSocketsHandler):
_db = None
session_uuid = ""
session_id = None
_session_user_id = None
_session_user_personal_group_id = None
_single_server_conn_id = None
_thread_context = None
_active_profile_id = None
_new_single_server_user_created = False
def _is_shell_object(self, object):
return type(object).__name__ in ['Dict', 'List']
def setup(self):
super(ShellGuiWebSocketHandler, self).setup()
self.extensions_map.update({
".js": "application/javascript"
})
self._db = None
self._session_user_id = None
self._session_user_personal_group_id = None
self._active_profile_id = None
self._module_sessions = {}
self._requests = {}
self._requests_mutex = threading.Lock()
self.key = None
self.packets = {}
# Registry of handlers for prompt requests sent to the FE
self._prompt_handlers = {}
# A thread will be processing all the responses
self._response_queue = Queue()
self._response_thread = threading.Thread(target=self.process_responses)
def process_responses(self):
while self.connected:
try:
json_message = self._response_queue.get(timeout=1)
except Empty:
continue
self.send_message(json.dumps(json_message, default=str))
def process_message(self, json_message):
request = json_message.get('request')
if request == 'authenticate':
if not self.is_authenticated:
if self.single_server is not None:
self.setup_single_server(json_message)
else:
self.authenticate_session(json_message)
else:
self.send_response_message('ERROR',
'This session was already '
'authenticated.',
json_message.get('request_id'))
elif request == 'logout':
if self.is_authenticated:
if self.single_server is not None:
self.logout_single_server_user()
else:
self._session_user_id = None
self.send_response_message('OK',
'User successfully logged out.',
json_message.get('request_id'))
else:
self.send_response_message('ERROR',
'This session is not '
'authenticated.',
json_message.get('request_id'))
elif not self.is_authenticated:
self.send_response_message('ERROR',
'This session is not yet '
'authenticated.',
json_message.get('request_id'))
elif request == 'execute':
self.execute_command_request(json_message)
elif request == 'cancel':
self.cancel_request(json_message)
elif request == 'prompt_reply':
self.prompt_reply(json_message)
else:
self.send_response_message('ERROR',
f'Unknown request: {request}.',
json_message.get('request_id'))
def on_ws_message(self, frame: WebSocket.Frame):
if frame.is_initial_fragment:
self.packets[self.session_uuid] = WebSocket.Packet()
self.packets[self.session_uuid].append(frame)
if self.packets[self.session_uuid].done():
message = self.packets[self.session_uuid].message
del self.packets[self.session_uuid]
logger.debug2(message=message, sensitive=True, prefix="<- ")
try:
json_message = None
try:
json_message = json.loads(message)
except Exception:
raise Exception("Unable to decode the JSON message.")
if 'request' not in json_message:
raise Exception(
"The message is missing the 'request' attribute.")
if 'request_id' not in json_message:
raise Exception(
"The message is missing the 'request_id' attribute.")
if not json_message["request_id"]:
raise Exception('No request_id given. '
'Please provide the request_id.')
request_id = json_message["request_id"]
# log message, if logging does not work, do not process
# the message due to security concerns
if not BackendDbLogger.message(self.session_id, json.dumps(message), is_response=False,
request_id=request_id):
raise Exception("Unable to process the request.")
self.process_message(json_message)
except Exception as e:
# Add the request id to the response if we have it available
args = {}
if json_message and 'request_id' in json_message:
args['request_id'] = json_message["request_id"]
# log the original message
BackendDbLogger.message(self.session_id,
json.dumps(message), is_response=False)
self.send_json_response(Response.exception(e, args))
def on_ws_connected(self):
logger.info("Websocket connected")
reset_session = False
if self.cookies and 'SessionId' in self.cookies:
requested_session_id = self.cookies['SessionId']
self.session_uuid = str(requested_session_id)
try:
with self.db_tx() as db:
row = db.execute(
"""SELECT * FROM session
WHERE uuid=? AND source_ip=?
ORDER BY id DESC
LIMIT 1""", (requested_session_id, self.client_address[0])).fetch_one()
if row is not None:
if row['ended'] is not None:
# recover the session by using a continued session
db.execute(
'INSERT INTO session(uuid, continued_session_id, user_id, started, source_ip) VALUES(?, ?, ?, ?, ?)',
(requested_session_id, row['continued_session_id'] + 1, row['user_id'], datetime.datetime.now(), self.client_address[0]))
# In transaction context so the right id is returned
self.session_id = db.get_last_row_id()
row = db.execute(
"SELECT * FROM session WHERE id=?", (self.session_id, )).fetch_one()
if row['user_id'] and row['uuid']:
self.session_uuid = requested_session_id
self.session_id = row['uuid']
self._session_user_id = row['user_id']
self._session_user_personal_group_id = get_id_personal_user_group(
db, self._session_user_id)
default_profile = user_handler.get_default_profile(
db, self._session_user_id)
self.set_active_profile_id(
default_profile["id"])
threading.current_thread(
).name = f'wss-{self.session_uuid}'
self.send_response_message('OK', 'Session recovered', values={
"session_uuid": self.session_uuid,
"local_user_mode": self.is_local_session,
"single_server_mode": self.single_server is not None,
"active_profile": default_profile})
# Starts the response processor...
self._response_thread.start()
return
# If reaches this point it means the session id on the cookie was not valid at the end
# so we make sure a new session is created with the right UUID
reset_session = True
except Exception:
# No problem, we continue to create the new session
pass
# define a session uuid
if reset_session:
self.db.close()
self._db = None
self.session_uuid = str(uuid.uuid1())
# set the name of the current thread to the session_uuid
threading.current_thread().name = f'wss-{self.session_uuid}'
# insert this new session into the session table
try:
logger.info("Registering session...")
with self.db_tx() as db:
db.execute(
'INSERT INTO session(uuid, continued_session_id, started, source_ip) VALUES(?, ?, ?, ?)',
(self.session_uuid, 0, datetime.datetime.now(), self.client_address[0]))
self.session_id = db.get_last_row_id()
except Exception as e: # pragma: no cover
logger.error(f'Session could not be inserted into db. {e}')
self.send_json_response(Response.exception(e))
self._ws_close()
return
# send the session uuid back to the browser
logger.info("Sending session response...")
self.send_response_message('OK', 'A new session has been created',
values={"session_uuid": self.session_uuid,
"local_user_mode": self.is_local_session,
"single_server_mode": self.single_server is not None})
# Starts the response processor...
self._response_thread.start()
def on_ws_closed(self):
self.teardown()
if self.single_server is not None:
self.single_server_cleanup()
logger.info("Websocket closed")
def teardown(self):
# if the database connection for this thread was opened, close it
if self._db:
if self.session_uuid:
self.db.execute("UPDATE session SET ended=? WHERE uuid=?",
(datetime.datetime.now(), self.session_uuid))
self._db.close()
self._db = None
# close module sessions. use a copy so that we don't change the dict during the for
for module_session in dict(self._module_sessions).values():
module_session.close()
if self._response_thread.is_alive():
self._response_thread.join()
def on_ws_sending_message(self, message):
json_message = json.loads(message)
if BackendDbLogger.message(self.session_id, message, is_response=True,
request_id=json_message.get('request_id', None)):
return message
logger.error("Failed to log message in the database.")
return json.dumps(Response.error(
"Response cancelled by the application.", {
"request_id": json_message["request_id"]
}))
def check_credentials(self, auth_header):
if self.cached_successful_auth == auth_header:
return True
# decode provided credentials
credentials = base64.b64decode(
auth_header[6:].encode("utf8")).decode("utf-8").split(':')
username = credentials[0]
password = credentials[1]
# since this function is called from outside a websocket session
# use separate GuiBackendDb() instance that needs to be closed
db = GuiBackendDb()
success = False
try:
res = db.execute('''SELECT id, password_hash
FROM user
WHERE name = ?''',
(username, )).fetch_one()
if res:
salt = res['password_hash'][:64]
password_hash = hashlib.pbkdf2_hmac(
'sha256', password.encode(), salt.encode(), 100000).hex()
if res['password_hash'][64:] == password_hash:
success = True
self.cached_successful_auth = auth_header
except Exception as e:
error_msg = f'User could not be authenticated. {str(e)}.'
logger.error(error_msg)
finally:
db.close()
return success
def send_json_response(self, json_message):
# Special handling required for shell objects
if self._is_shell_object(json_message):
json_message = json.loads(str(json_message).replace("\n", "\\n"))
self._response_queue.put(json_message)
def send_response_message(self, msg_type, msg, request_id=None,
values=None, api=False):
# get message text which is either a Dict that is converted to JSON or
# a str
msg_text = json.dumps(msg) if isinstance(msg, dict) else msg
# if a request_id is given, add it to the message
id_arg = {"request_id": request_id} if request_id else {}
values_arg = {}
if not values is None:
# Special handling required for shell objects
if self._is_shell_object(values):
values = json.loads(str(values).replace("\n", "\\n"))
if api:
values_arg = {"result": values}
else:
# TODO(rennox): We should normalize the returning of responses
# there is no reason to have different flavors based on the
# type of values being returned
values_arg = values if isinstance(values, dict) else {
"result": values}
full_response = Response.standard(
msg_type, msg_text, {**id_arg, **values_arg})
# send the response message
self.send_json_response(full_response)
if msg_type in ["OK", "ERROR", "CANCELLED"]:
self.unregister_module_request(request_id)
def send_command_response(self, request_id, values):
# TODO(rennox): This function has to do weird magic because it
# is called to send the response from different commands, the
# PROBLEM is that the commands should NEVER be creating the
# response themselves, they should be implemented as simple APIs
# and their either succeed and return whatever value they return... or
# they should throw exceptions
def convert_binary_values(value):
if isinstance(value, bytes):
return str(base64.b64encode(value), 'utf-8')
if isinstance(value, dict) or "Dict" in type(value).__name__:
result = {}
for key, val in value.items():
result[key] = convert_binary_values(val)
return result
if isinstance(value, list) or "List" in type(value).__name__:
return [convert_binary_values(val) for val in value]
return value
values = convert_binary_values(values)
if isinstance(values, dict) and 'request_state' in values:
values["request_id"] = request_id
# send the response message
self.send_json_response(values)
self.unregister_module_request(request_id)
else:
self.send_response_message(
"OK", "", request_id=request_id, values=values, api=True)
def send_command_done(self, request_id):
self.send_json_response(Response.standard(
"OK", "", {"request_id": request_id, "done": True}))
self.unregister_module_request(request_id)
@property
def is_authenticated(self):
return self._session_user_id is not None
@property
def session_user_id(self):
return self._session_user_id
@property
def user_personal_group_id(self):
return self._session_user_personal_group_id
@property
def session_active_profile_id(self):
return self._active_profile_id
def set_active_profile_id(self, profile_id):
self._active_profile_id = profile_id
@property
def db(self):
# if the db object has not yet been initialized for this thread
if not self._db:
# open the database connection for this thread
self._db = GuiBackendDb(
log_rotation=True, session_uuid=self.session_uuid)
return self._db
@contextmanager
def db_tx(self):
close = False
if threading.current_thread().getName() == self.session_uuid:
db = self.db
# if not, initialize a new database connection since SQLite objects
# can only be used in the thread they were created in
else:
close = True
db = GuiBackendDb()
try:
db.start_transaction()
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
if close:
db.close()
def get_module_session_object(self, module_session_id) -> ModuleSession:
# Check if there is a module_session with the given
# module_session_id in the module_session cache
module_session = self._module_sessions.get(module_session_id)
if not module_session:
raise Exception(f'There is no module_session in the cache that has '
f'the module_session_id '
f'{module_session_id} assigned.')
return module_session
def authenticate_session(self, json_msg):
request_id = json_msg.get('request_id')
username = json_msg.get('username')
if username.startswith("ssu:"):
raise RuntimeError('Single server user authentication not supported')
try:
if self.is_local_session:
if username != gui.users.backend.LOCAL_USERNAME: # type: ignore
raise Exception('Incorrect username or password')
gui.users.backend.create_local_user(self.db) # type: ignore
row = self.db.execute(
'SELECT id, password_hash FROM user '
'WHERE upper(name) = upper(?)',
(username,)).fetch_one()
if row:
password_hash = None
if not self.is_local_session:
salt = row['password_hash'][64:]
password_hash = hashlib.pbkdf2_hmac(
'sha256', json_msg['password'].encode(), salt.encode(), 100000).hex()
if self.is_local_session or (password_hash and row[1] == password_hash + salt):
with self.db_tx() as db:
db.execute('UPDATE session SET user_id=? WHERE uuid=?',
(row['id'], self.session_uuid))
self._session_user_id = row[0]
self._session_user_personal_group_id = get_id_personal_user_group(
db, self._session_user_id)
# get default profile for the user
default_profile = gui.users.get_default_profile( # type: ignore
row[0], self.db)
self.set_active_profile_id(default_profile["id"])
values = {"active_profile": default_profile}
self.send_response_message('OK',
f'User {username} was '
f'successfully authenticated.',
request_id, values)
# TODO
# Update web_session with self.session_user_id
# TODO
# Cache the user's privileges
else:
raise Exception('Incorrect username or password')
else:
raise Exception('Incorrect username or password')
except Exception as e:
error_msg = f'User could not be authenticated. {str(e)}.'
logger.exception(error_msg)
self.send_response_message('ERROR', error_msg, request_id)
def execute_command_request(self, json_msg):
request_id = json_msg.get('request_id')
try:
cmd = json_msg.get('command')
if not cmd:
raise Exception(
'No command given. Please provide the command.')
# Check if user is allowed to execute this command
allowed = False
res = self.db.execute(
'''SELECT p.name, p.access_pattern
FROM privilege p
INNER JOIN role_has_privilege r_p
ON p.id = r_p.privilege_id
INNER JOIN user_has_role u_r
ON r_p.role_id = u_r.role_id
WHERE u_r.user_id = ? AND p.privilege_type_id = 1''',
(self.session_user_id,)).fetch_all()
for row in res:
p = re.compile(row['access_pattern'])
m = p.match(cmd)
if not m:
raise Exception(f'This user account has no privileges to '
f'execute the command {cmd}')
allowed = True
break
if not allowed:
raise Exception(f'This user does not have the necessary '
f'privileges to execute the command {cmd}.')
# Argument need to be passed in a dict using the argument names as
# the keys
args = json_msg.get('args', {})
kwargs = json_msg.get('kwargs', {})
kwargs = {**args, **kwargs}
# Inspect the function arguments and check if there are arguments
# named user_id, profile_id, web_session, request_id,
# module_session, async_web_session or session.
# If so, replace them with session variables
f_args = []
# Loop over all chained objects/functions of the given cmd and find
# the function to call
matches = re.findall(r'(\w+)\.', cmd + '.')
parent_obj = None
func = None
if len(matches) < 2:
raise Exception(
f"The command '{cmd}' is using wrong format. "
"Use <global>[.<object>]*.<function>")
# Last entry is a function name
function_name = matches[-1]
# Rest is a chain of objects
objects = matches[:-1]
found_objects = []
# Selects the parent object
if objects[0] == 'gui':
parent_obj = gui
objects = objects[1:]
found_objects.append('gui')
else:
parent_obj = mysqlsh.globals
# Searches the object hierarchy
for object in objects:
try:
# Convert from camelCase to snake_case
object = re.sub(r'(?<!^)(?=[A-Z])', '_', object).lower()
child = getattr(parent_obj, object)
# Set the parent_obj for the next object evaluation
parent_obj = child
found_objects.append(object)
except:
if len(found_objects) == 0:
raise Exception(
f"The '{object}' global object does not exist")
else:
raise Exception(
f"Object '{'.'.join(found_objects)}' has no member named '{object}'")
# Searches the target function
try:
func = getattr(parent_obj, function_name)
except:
raise Exception(
f"Object '{'.'.join(found_objects)}' has no member function named '{function_name}'")
f_args = {}
if func:
f_args = self.get_function_arguments(
func=func, mod=parent_obj, mod_cmd=function_name)
lock_session = False
if found_objects[0] == 'gui':
# This is the `user_id` that needs to be provided by the user
# like for the function `add_profile(user_id, profile)`
if "user_id" in f_args:
# Return error if user_id does not match self.session_user_id
if self.session_user_id is None or "user_id" not in kwargs \
or kwargs["user_id"] != self.session_user_id:
raise Exception(f'The function argument user_id must not '
f'be set to a different user_id than the '
f'one used in the '
f'authenticated session.')
kwargs.update({"user_id": self.session_user_id})
# The `_user_id` here is for internal use only
# it's not exposed to the user and it's replaced
# always by session_user_id
if "_user_id" in f_args and not self.is_local_session:
kwargs.update({"_user_id": self.session_user_id})
if "profile_id" in f_args:
if "profile_id" not in kwargs:
kwargs.update({"profile_id":
self.session_active_profile_id})
if "web_session" in f_args:
raise Exception(
f'Argument web_session not allowed for function: {cmd}.')
if "request_id" in f_args:
raise Exception(
f'Argument request_id not allowed for function: {cmd}.')
if "be_session" in f_args:
kwargs.update({"be_session": self.db})
if "db_connection_id" in f_args and self.single_server is not None:
if self._single_server_conn_id is None:
if self.session_uuid in self.get_cache():
self._single_server_conn_id = self.get_cache()[
self.session_uuid][1]
del self.get_cache()[
self.session_uuid]
kwargs.update({"db_connection_id": self._single_server_conn_id})
if "interactive" in f_args:
kwargs.update({"interactive": False})
# If the function may receive a message handler, adds the kwarg so the
# RequestHandler properly sets the callback
if "send_gui_message" in f_args:
kwargs.update({"send_gui_message": True})
lock_session = False
if "session" in f_args:
# If the called function requires a session parameter,
# get it from the given module_session
if not 'module_session_id' in kwargs:
raise Exception(
f'The function {cmd} requires the module_session_id '
'argument to be set.')
module_session = self.get_module_session_object(
kwargs['module_session_id'])
if not isinstance(module_session, DbModuleSession):
raise Exception(
f'The function {cmd} needs a module_session_id '
'argument set to a DbModuleSession.')
user_session_functions = ["gui.sql_editor.execute",
"gui.sql_editor.default_user_schema", "gui.sql_editor.get_current_schema",
"gui.sql_editor.set_current_schema", "gui.sql_editor.get_auto_commit",
"gui.sql_editor.set_auto_commit"]
if isinstance(module_session, SqlEditorModuleSession) and cmd in user_session_functions:
db_module_session = module_session._db_user_session
else:
db_module_session = module_session._db_service_session
if not isinstance(db_module_session, DbSession):
raise Exception(
f'The function {cmd} needs a module_session_id '
'argument set to a DbSession.')
self.register_module_request(
request_id, kwargs['module_session_id'])
kwargs.update({"session": db_module_session})
# The plugins written for the Shell that work with standard Shell session fall on this branch,
# the session must be locked while the function is executed to avoid race conditions that may
# lead to shell failures
if found_objects[0] != 'gui':
lock_session = True
del kwargs['module_session_id']
module_session = None
if "module_session" in f_args:
if "module_session_id" not in kwargs:
raise Exception('No module_session_id given. Please '
'provide the module_session_id.')
# swap 'module_session_id' with 'module_session'
module_session = self.get_module_session_object(
kwargs['module_session_id'])
kwargs.update({"module_session": module_session})
self.register_module_request(
request_id, kwargs['module_session_id'])
del kwargs['module_session_id']
thread = RequestHandler(
request_id, func, kwargs, self, lock_session=lock_session)
thread.start()
result = None
except Exception as e:
logger.exception(e)
result = Response.exception(e)
if result is not None:
self.send_command_response(request_id, result)
def get_function_arguments(self, func, mod, mod_cmd):
get_args_from_help = False
try:
# try to use the regular inspection function to get the function
# arguments
sig = inspect.signature(func)
f_args = [p.name for p in sig.parameters.values()]
except: # pragma: no cover
# if that fails, fall back to parsing the help output of that
# function
get_args_from_help = True
if get_args_from_help or 'kwargs' in f_args:
help_func = getattr(mod, 'help')
help_output = help_func(f'{mod_cmd}')
match = re.match(r'(.|\s)*?SYNTAX(.|\s)*?\(([\w,\[\]\s]*)',
help_output, flags=re.MULTILINE)
if match:
arguments = match[3].replace('[', '').replace(']', '').\
replace('\n', '').replace(' ', '')
f_args = arguments.split(",")
else:
f_args = []
# Include the kwargs
if 'kwargs' in f_args:
f_args.remove('kwargs')
desc_idx = help_output.find(
'The kwargs parameter accepts the following options:')
desc = help_output[desc_idx + 53:]
matches = re.findall(r'-\s(\w*)\:', desc, flags=re.MULTILINE)
for match in matches:
f_args.append(match)
return f_args
def register_module_session(self, module_session):
self._module_sessions[module_session.module_session_id] = module_session
def unregister_module_session(self, module_session):
if module_session.module_session_id in self._module_sessions:
# If we close module we need also clean up requests registry for that module
with self._requests_mutex:
self._requests = {k: v for k, v in self._requests.items(
) if v != module_session.module_session_id}
del self._module_sessions[module_session.module_session_id]
def register_module_request(self, request_id, module_session_id):
with self._requests_mutex:
self._requests[request_id] = module_session_id
def unregister_module_request(self, request_id):
with self._requests_mutex:
if request_id in self._requests:
del self._requests[request_id]
def cancel_request(self, json_msg):
request_id = json_msg.get('request_id')
try:
if not request_id:
raise Exception('No request_id given. '
'Please provide the request_id.')
module_session = self.get_module_session_object(
self._requests[request_id])
if not hasattr(module_session, 'cancel_request'):
raise Exception(
f"Module {type(module_session)} doesn't support cancel_request.")
module_session.cancel_request(request_id)
self.send_response_message('OK', 'Request cancelled.', request_id)
except Exception as e:
logger.error(e)
self.send_response_message('ERROR', str(e).strip(), request_id)
def send_prompt_response(self, request_id, prompt, handler):
self._prompt_handlers[request_id] = handler
self.send_response_message("PENDING",
'Executing...',
request_id,
prompt,
api=True)
def prompt_reply(self, json_msg):
request_id = json_msg.get('request_id')
try:
if not request_id:
raise Exception('No request_id given. '
'Please provide the request_id.')
prompt_handler = self._prompt_handlers.pop(request_id)
prompt_handler.process_prompt_reply(json_msg)
except KeyError as e:
logger.error(e)
self.send_response_message(
'ERROR', f'Unexpected prompt for request_id=\'{request_id}\'')
except Exception as e:
logger.error(e)
self.send_response_message('ERROR', str(e).strip(), request_id)
def create_single_server_user(self, username, request_id):
try:
username = "ssu:" + username
user_id = None
try:
user_id = gui.users.backend.get_user_id( # type: ignore
self.db, username)
except Exception:
pass
if user_id is None:
user_id = gui.users.backend.create_user( # type: ignore
self.db, username, "", "Single Server User", "localhost", True)
self._new_single_server_user_created = True
with self.db_tx() as db:
db.execute('UPDATE session SET user_id=? WHERE uuid=?',
(user_id, self.session_uuid))
self._session_user_id = user_id
self._session_user_personal_group_id = get_id_personal_user_group(
db, self._session_user_id)
# get default profile for the user
default_profile = gui.users.get_default_profile( # type: ignore
user_id, self.db)
self.set_active_profile_id(default_profile["id"])
except Exception as e:
error_msg = f'User could not be authenticated. {str(e)}.'
logger.exception(error_msg)
self.send_response_message('ERROR', error_msg, request_id)
def setup_single_server(self, json_msg):
self.get_cache().set_clean_func(self.delete_cached_connections)
con_string = self.single_server.split(':')
server = con_string[0]
port = con_string[1]
request_id = json_msg.get('request_id')
username = json_msg.get('username')
password = json_msg.get('password')
hashed_session_id = hashlib.sha256(str(self.session_id).encode()).hexdigest()
single_server_connection_name = f"Single MySQL Server ({hashed_session_id})"
connection = {
"db_type": "MySQL",
"caption": single_server_connection_name,
"description": "Connection to Single MySQL Server",
"options": {
"scheme": "mysql",
"user": username,
"host": server,
"port": port
}}
self.create_single_server_user(username, request_id)
self._thread_context = threading.local()
self._thread_context.request_id = request_id
self._thread_context.web_handler = self
current_thread = threading.current_thread()
setattr(current_thread, 'get_context', self.get_context)
if self.authenticate_single_server_user(copy.deepcopy(connection), password):
connections = gui.db_connections.list_db_connections( # type: ignore
self._active_profile_id)
for conn in connections:
if conn['caption'] == single_server_connection_name:
self._single_server_conn_id = conn['id']
break
if self._single_server_conn_id is None:
self._single_server_conn_id = self.add_single_server_connection(connection)
default_profile = gui.users.get_default_profile( # type: ignore
self._session_user_id, self.db)
values = {"active_profile": default_profile}
self.send_response_message('OK',
f'User {username} was '
f'successfully authenticated.',
request_id, values)
else:
if self._new_single_server_user_created:
gui.users.delete_user("ssu:" + username) # type: ignore
self._session_user_id = None
self._session_user_personal_group_id = None
self._active_profile_id = None
self.send_response_message('ERROR',
f'User {username} could not be authenticated.',
request_id)
def get_context(self):
return self._thread_context
def authenticate_single_server_user(self, connection, password):
mysqlsh.globals.shell.delete_all_credentials()
new_session = DbModuleSession(skip_confirmation_message=True)
new_session.open_connection(connection, password)
new_session.completion_event.wait() # type: ignore
if not new_session.completion_event.has_errors: # type: ignore
new_session.close()
return True
return False
def single_server_remove_connection(self):
if self._single_server_conn_id is not None:
gui.db_connections.remove_db_connection( # type: ignore
self._active_profile_id, self._single_server_conn_id)
self._single_server_conn_id = None
def single_server_cleanup(self):
self._thread_context = None
self.get_cache()[self.session_uuid] = (
self._active_profile_id, self._single_server_conn_id) # type: ignore
self._single_server_conn_id = None
def logout_single_server_user(self):
self.single_server_remove_connection()
self._session_user_id = None
def add_single_server_connection(self, connection):
return gui.db_connections.add_db_connection( # type: ignore
self._active_profile_id, connection)[0]
def delete_cached_connections(self):
for key in self.get_cache():
profile_id, connection_id = self.get_cache()[key]
gui.db_connections.remove_db_connection( # type: ignore
profile_id, connection_id, self.db)