in gui/backend/gui_plugin/shell/ShellModuleSession.py [0:0]
def __init__(self, options=None, settings=None, shell_args=None):
context = get_context()
request_id = context.request_id if context else None
super().__init__()
EXTENSION_SHELL_USER_CONFIG_FOLDER_BASENAME = "mysqlsh-gui"
# Symlinks the plugins on the master shell as we want them available
# on the Shell Console
self._subprocess_home = mysqlsh.plugin_manager.general.get_shell_user_dir( # pylint: disable=no-member
'plugin_data', 'gui_plugin', 'shell_instance_home')
if not os.path.exists(self._subprocess_home):
os.makedirs(self._subprocess_home)
subprocess_plugins = os.path.join(self._subprocess_home, 'plugins')
# Get the actual plugin path that this gui_plugin is in
module_file_path = os.path.dirname(__file__)
plugins_path = os.path.dirname(os.path.dirname(module_file_path))
# If this is a development setup using the global shell user config dir,
# setup a symlink if it does not exist yet
if (not mysqlsh.plugin_manager.general.get_shell_user_dir().endswith(
EXTENSION_SHELL_USER_CONFIG_FOLDER_BASENAME)
and not os.path.exists(subprocess_plugins)):
if os.name == 'nt':
p = subprocess.run(
f'mklink /J "{subprocess_plugins}" "{plugins_path}"',
shell=True)
p.check_returncode()
else:
os.symlink(plugins_path, subprocess_plugins)
# Check if MDS options have been specified
connection_args = []
if not options is None:
session_handler = ShellDbSessionHandler(options,
message_callback=lambda msg_type, msg, result: self._web_session.send_response_message(
msg_type=msg_type,
msg=msg,
request_id=request_id,
values=result, api=False))
session_handler.open()
options = session_handler.connection_options
if settings is not None:
if 'ssh' in settings.keys():
connection_args.append('--ssh')
connection_args.append(settings.pop('ssh'))
if 'ssh-identity-file' in settings.keys():
connection_args.append('--ssh-identity-file')
connection_args.append(
settings.pop('ssh-identity-file'))
connection_args.append(
mysqlsh.globals.shell.unparse_uri(options))
self._last_prompt = {}
# Empty command to keep track of the shell initialization
self._pending_request = ShellCommandTask(
request_id, "", result_callback=self._handle_api_response)
self._last_info = None
self._shell_exited = False
self._shell = None # start a new shell process here...
self._thread = None # create a new thread to do the async processing
self._response_thread = None # thread that handles the shell response messages
self._initialize_complete = threading.Event()
self._terminate_complete = threading.Event()
self._command_complete = threading.Event()
self._cancel_requests = []
self.command_blacklist = [
'\\',
'\\edit', '\\e',
'\\exit',
'\\history',
'\\nopager',
'\\pager', '\\P',
'\\quit', '\\q',
'\\rehash',
'\\source', '\\.',
'\\system', '\\!'
]
env = os.environ.copy()
# TODO: Workaround for Bug #33164726
env['MYSQLSH_USER_CONFIG_HOME'] = self._subprocess_home + "/"
env["MYSQLSH_JSON_SHELL"] = "1"
if "MYSQLSH_PROMPT_THEME" in env:
del env["MYSQLSH_PROMPT_THEME"]
if 'ATTACH_DEBUGGER' in env:
del env['ATTACH_DEBUGGER']
if not 'TERM' in env:
env['TERM'] = 'xterm-256color'
with open(os.path.join(self._subprocess_home, 'options.json'), 'w') as options_file:
json.dump({
"history.autoSave": "true"
}, options_file)
with open(os.path.join(self._subprocess_home, 'prompt.json'), 'w') as prompt_file:
json.dump({
"variables": {
"is_production": {
"match": {
"pattern": "*;host;*[*?*]",
"value": ";%env:PRODUCTION_SERVERS;[%host%]"
},
"if_true": "true",
"if_false": "false"
},
"is_ssl": {
"match": {
"pattern": "%ssl%",
"value": "SSL"
},
"if_true": "true",
"if_false": "false"
}
},
"prompt": {
"text": "\n",
"cont_text": "-> "
},
"segments": [
{
"text": "{ \"prompt_descriptor\": { "
},
{
"text": "\"user\": \"%user%\", "
},
{
"text": "\"host\": \"%host%\", \"port\": \"%port%\", \"socket\": \"%socket%\", "
},
{
"text": "\"schema\": \"%schema%\", \"mode\": \"%Mode%\", \"session\": \"%session%\","
},
{
"text": "\"ssl\": %is_ssl%, \"is_production\": %is_production%"
},
{
"text": " } }"
}
]
},
prompt_file,
indent=4)
executable = sys.executable
if 'executable' in dir(mysqlsh):
executable = mysqlsh.executable
exec_name = executable if executable.endswith(
"mysqlsh") or executable.endswith("mysqlsh.exe") else "mysqlsh"
# Temporarily passing --no-defaults until it is a configurable option in FE and is received as parameter in the BE
popen_args = ["--no-defaults", "--interactive=full", "--passwords-from-stdin",
"--py", "--json=raw", "--quiet-start=2", "--column-type-info"]
# Adds the connection data to the call arguments
if len(connection_args) > 0:
popen_args = popen_args + connection_args
# Adds the shell command args to the call arguments
if shell_args is not None:
popen_args = popen_args + shell_args
popen_args.insert(0, exec_name)
self._shell = subprocess.Popen(popen_args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
encoding='utf-8', env=env, text=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0)
self._request_queue: "Queue[ShellCommandTask]" = Queue()
self._thread = threading.Thread(target=self.handle_frontend_command)
self._response_thread = threading.Thread(
target=self.handle_shell_output)
# shell_args is None when it is an interactive session, otherwise it is meant to be an operation to be executed as a CLI call so the frontend processing thread is not needed
if shell_args is None:
self._thread.start()
self._response_thread.start()