gui/backend/gui_plugin/shell/ShellModuleSession.py (373 lines of code) (raw):

# Copyright (c) 2021, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import json import os import os.path import signal import subprocess import sys import threading from queue import Queue import mysqlsh from mysqlsh.plugin_manager import \ plugin_function # pylint: disable=no-name-in-module import gui_plugin.core.Error as Error import gui_plugin.core.Logger as logger from gui_plugin.core import Filtering from gui_plugin.core.BaseTask import CommandTask from gui_plugin.core.Context import get_context from gui_plugin.core.dbms.DbMySQLSession import DbMysqlSession from gui_plugin.core.Error import MSGException from gui_plugin.core.modules import ModuleSession def remove_dict_useless_items(data): result = {} # remove empty entries for key, value in data.items(): if isinstance(value, dict): value = remove_dict_useless_items(value) if not value in ('', None, {}): result[key] = value # is_production and ssl are only valid when connected to a MySQL server if 'host' not in result: if 'is_production' in result: del result['is_production'] if 'ssl' in result: del result['ssl'] return result class ShellCommandTask(CommandTask): def __init__(self, task_id, command, params=None, result_queue=None, result_callback=None, options=None, skip_completion=False): super().__init__(task_id, command, params=params, result_queue=result_queue, result_callback=result_callback, options=options, skip_completion=skip_completion) self.dispatch_result("PENDING", message='Execution started...') def do_execute(self): pass def send_output(self, data): if isinstance(data, str): try: data = json.loads(data) except json.decoder.JSONDecodeError: # Few errors in the shell may be reported as non JSON, usually initialization errors error = {"error": data} data = json.loads(json.dumps(error)) self.dispatch_result( "PENDING", message='Executing...', data=data) def complete(self, message=None, data=None): self.dispatch_result("OK", message=message, data=data) def fail(self, message=None, data=None): self.dispatch_result("ERROR", message=message, data=data) def cancel(self): super().cancel() self.dispatch_result("CANCELLED") class ShellQuitTask(ShellCommandTask): def __init__(self, task_id=None, params=None, result_queue=None, result_callback=None, options=None): super().__init__(task_id, {"execute": "\\quit"}, params, result_queue, result_callback, options, True) class ShellDbSessionHandler(DbMysqlSession): def __init__(self, connection_options, message_callback=None): super().__init__(0, False, connection_options, message_callback=message_callback) def _do_open_database(self, notify_success=True): self._on_connect() return False class ShellModuleSession(ModuleSession): def __init__(self, options=None, settings=None, shell_args=None): context = get_context() request_id = context.request_id if context else None super().__init__() EXTENSION_SHELL_USER_CONFIG_FOLDER_BASENAME = "mysqlsh-gui" # Symlinks the plugins on the master shell as we want them available # on the Shell Console self._subprocess_home = mysqlsh.plugin_manager.general.get_shell_user_dir( # pylint: disable=no-member 'plugin_data', 'gui_plugin', 'shell_instance_home') if not os.path.exists(self._subprocess_home): os.makedirs(self._subprocess_home) subprocess_plugins = os.path.join(self._subprocess_home, 'plugins') # Get the actual plugin path that this gui_plugin is in module_file_path = os.path.dirname(__file__) plugins_path = os.path.dirname(os.path.dirname(module_file_path)) # If this is a development setup using the global shell user config dir, # setup a symlink if it does not exist yet if (not mysqlsh.plugin_manager.general.get_shell_user_dir().endswith( EXTENSION_SHELL_USER_CONFIG_FOLDER_BASENAME) and not os.path.exists(subprocess_plugins)): if os.name == 'nt': p = subprocess.run( f'mklink /J "{subprocess_plugins}" "{plugins_path}"', shell=True) p.check_returncode() else: os.symlink(plugins_path, subprocess_plugins) # Check if MDS options have been specified connection_args = [] if not options is None: session_handler = ShellDbSessionHandler(options, message_callback=lambda msg_type, msg, result: self._web_session.send_response_message( msg_type=msg_type, msg=msg, request_id=request_id, values=result, api=False)) session_handler.open() options = session_handler.connection_options if settings is not None: if 'ssh' in settings.keys(): connection_args.append('--ssh') connection_args.append(settings.pop('ssh')) if 'ssh-identity-file' in settings.keys(): connection_args.append('--ssh-identity-file') connection_args.append( settings.pop('ssh-identity-file')) connection_args.append( mysqlsh.globals.shell.unparse_uri(options)) self._last_prompt = {} # Empty command to keep track of the shell initialization self._pending_request = ShellCommandTask( request_id, "", result_callback=self._handle_api_response) self._last_info = None self._shell_exited = False self._shell = None # start a new shell process here... self._thread = None # create a new thread to do the async processing self._response_thread = None # thread that handles the shell response messages self._initialize_complete = threading.Event() self._terminate_complete = threading.Event() self._command_complete = threading.Event() self._cancel_requests = [] self.command_blacklist = [ '\\', '\\edit', '\\e', '\\exit', '\\history', '\\nopager', '\\pager', '\\P', '\\quit', '\\q', '\\rehash', '\\source', '\\.', '\\system', '\\!' ] env = os.environ.copy() # TODO: Workaround for Bug #33164726 env['MYSQLSH_USER_CONFIG_HOME'] = self._subprocess_home + "/" env["MYSQLSH_JSON_SHELL"] = "1" if "MYSQLSH_PROMPT_THEME" in env: del env["MYSQLSH_PROMPT_THEME"] if 'ATTACH_DEBUGGER' in env: del env['ATTACH_DEBUGGER'] if not 'TERM' in env: env['TERM'] = 'xterm-256color' with open(os.path.join(self._subprocess_home, 'options.json'), 'w') as options_file: json.dump({ "history.autoSave": "true" }, options_file) with open(os.path.join(self._subprocess_home, 'prompt.json'), 'w') as prompt_file: json.dump({ "variables": { "is_production": { "match": { "pattern": "*;host;*[*?*]", "value": ";%env:PRODUCTION_SERVERS;[%host%]" }, "if_true": "true", "if_false": "false" }, "is_ssl": { "match": { "pattern": "%ssl%", "value": "SSL" }, "if_true": "true", "if_false": "false" } }, "prompt": { "text": "\n", "cont_text": "-> " }, "segments": [ { "text": "{ \"prompt_descriptor\": { " }, { "text": "\"user\": \"%user%\", " }, { "text": "\"host\": \"%host%\", \"port\": \"%port%\", \"socket\": \"%socket%\", " }, { "text": "\"schema\": \"%schema%\", \"mode\": \"%Mode%\", \"session\": \"%session%\"," }, { "text": "\"ssl\": %is_ssl%, \"is_production\": %is_production%" }, { "text": " } }" } ] }, prompt_file, indent=4) executable = sys.executable if 'executable' in dir(mysqlsh): executable = mysqlsh.executable exec_name = executable if executable.endswith( "mysqlsh") or executable.endswith("mysqlsh.exe") else "mysqlsh" # Temporarily passing --no-defaults until it is a configurable option in FE and is received as parameter in the BE popen_args = ["--no-defaults", "--interactive=full", "--passwords-from-stdin", "--py", "--json=raw", "--quiet-start=2", "--column-type-info"] # Adds the connection data to the call arguments if len(connection_args) > 0: popen_args = popen_args + connection_args # Adds the shell command args to the call arguments if shell_args is not None: popen_args = popen_args + shell_args popen_args.insert(0, exec_name) self._shell = subprocess.Popen(popen_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8', env=env, text=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0) self._request_queue: "Queue[ShellCommandTask]" = Queue() self._thread = threading.Thread(target=self.handle_frontend_command) self._response_thread = threading.Thread( target=self.handle_shell_output) # shell_args is None when it is an interactive session, otherwise it is meant to be an operation to be executed as a CLI call so the frontend processing thread is not needed if shell_args is None: self._thread.start() self._response_thread.start() def __del__(self): self.close() super().__del__() def terminate_complete(self, type, message, request_id, result=None): if type == 'OK': self._terminate_complete.set() def close(self): # do cleanup, a \q to terminate the shell, overriding the command black list self._request_queue.put(ShellQuitTask( result_callback=self.terminate_complete, options=None)) self._terminate_complete.wait() super().close() def execute(self, command: str, callback=None, options=None): if callback is None: callback = self._handle_api_response command = command.strip() if command.startswith('\\'): if command.split(' ')[0] in self.command_blacklist: raise MSGException(Error.SHELL_COMMAND_NOT_SUPPORTED, "The requested command is not supported.") # Formats the command as expected by the shell command = {"execute": command} self.add_task(command, callback, options) def complete(self, data: str, offset=None, callback=None, options=None): if callback is None: callback = self._handle_api_response command = {"complete": {"data": data, "offset": 0 if offset is None else offset}} self.add_task(command, callback, options) def add_task(self, command, callback, options): context = get_context() task_id = context.request_id if context else None self._request_queue.put(ShellCommandTask( task_id, command, result_callback=callback, options=options)) def handle_shell_output(self): # Read characters from the shell stdout and build responses # to deliver to the handle_frontend_command method reply_line = "" error_buffer = "" while not self._shell_exited: reply_json = None char = self._shell.stdout.read(1) if len(char) == 0: break if not char == '\n': reply_line += char continue # when running on windows, remove the \r (from \r\n sequence) if reply_line.endswith('\r'): reply_line = reply_line[:-1] if reply_line.startswith("["): reply_line = f"{{ \"rows\": {reply_line} }}" if reply_line.startswith("{"): reply_json = json.loads(reply_line) # While in python mode, the python engine produces errors by # calling the print callback lots of times, to avoid sending a # lot of replies to the frontend we will cache consecutive errors # and send them in one call to the frontend as soon as a non # error response is received from the shell if 'error' in reply_json and isinstance(reply_json['error'], str): error_buffer += reply_json["error"] else: if len(error_buffer) > 0: self._pending_request.send_output( {"error": error_buffer}) error_buffer = "" if 'prompt_descriptor' in reply_json: # remove empty strings reply_json = remove_dict_useless_items(reply_json) # command complete if not self._initialize_complete.is_set(): data = {"last_prompt": self._last_prompt, "module_session_id": self.module_session_id} if self._last_prompt != reply_json: data.update(reply_json) self._pending_request.complete(message="New Shell Interactive session created successfully.", data=data) self._initialize_complete.set() else: self._pending_request.complete( data=None if self._last_prompt == reply_json else reply_json) self._command_complete.set() self._last_prompt = reply_json elif 'prompt' in reply_json: # request for a client prompt prompt_event = threading.Event() if 'type' in reply_json and reply_json['type'] == 'password': logger.add_filter({ "type": "key", "key": "reply", "expire": Filtering.FilterExpire.OnUse }) reply_json.update( {"module_session_id": self.module_session_id}) self.send_prompt_response( self._pending_request.task_id, reply_json, lambda: prompt_event.set()) # Locks until the prompt is handled prompt_event.wait() if self._prompt_replied: self._shell.stdin.write(self._prompt_reply + "\n") self._shell.stdin.flush() else: self.kill_command() elif 'value' in reply_json: # generic response to send to the client send_response = True if isinstance(reply_json['value'], str) and reply_json['value'].endswith('\r'): reply_json['value'] = reply_json['value'][:-1] if len(reply_json['value']) == 0: send_response = False if send_response: self._pending_request.send_output(reply_json) else: # Shell commands are stored as JSON and sent to the Shell # Then the shell will print them as JSON because of interactive=full # We do not need to reply back the original command to the frontend if self._pending_request.command != reply_json: if 'complete' in self._pending_request.command: self._pending_request.send_output( reply_json['info']) else: self._pending_request.send_output(reply_json) elif reply_line == "Bye!": self._shell_exited = True else: # Some shell errors are not reported as JSON, i.e. initialization errors error_buffer += reply_line reply_line = '' # A pending request is expected to be present in 3 cases: # - When the initialization of the session failed # - When a CLI call was done # - When the Shell session is closed if not self._pending_request is None: exit_status = None attempts = 3 while exit_status is None and attempts > 0: try: exit_status = self._shell.wait(5) except subprocess.TimeoutExpired: attempts = attempts - 1 if exit_status != 0: self._pending_request.fail(message=error_buffer, data={ "exit_status": exit_status}) else: data = {"module_session_id": self.module_session_id, "exit_status": exit_status} self._pending_request.complete( data=data) self._command_complete.set() # Finally closes the module session if not already being closed if not isinstance(self._pending_request, ShellQuitTask): self.close() # On error conditions no processing will take place, still the frontend handler thread is waiting, we need to let it go self._shell_exited = True if not self._initialize_complete.is_set(): self._initialize_complete.set() def handle_frontend_command(self): self._initialize_complete.wait() # Make the actual communication with the interactive shell while not self._shell_exited: # check if there is a command request to send to # the shell and handle it command = self._request_queue.get() if command.task_id in self._cancel_requests: command.cancel() self._cancel_requests.remove(command.task_id) continue self._pending_request = command self._shell.stdin.write(json.dumps(command.command) + "\n") self._shell.stdin.flush() # The command has been sent to the shell, now we wait until it completes self._command_complete.wait() self._command_complete.clear() self._pending_request = None def kill_command(self): # windows. Need CTRL_BREAK_EVENT to raise the signal in the whole process group os.kill(self._shell.pid, signal.CTRL_BREAK_EVENT if hasattr(signal, 'CTRL_BREAK_EVENT') else signal.SIGINT) # pylint: disable=no-member def cancel_request(self, request_id): self._cancel_requests.append(request_id) def kill_shell_task(self): context = get_context() request_id = context.request_id if context else None if not self._command_complete.is_set() and self._pending_request is not None: self.kill_command() self.send_command_response(request_id, 'Command killed') else: self.send_command_response(request_id, 'Nothing to kill')