modules/wb.utils/wb_utils_grt.py (940 lines of code) (raw):
# Copyright (c) 2009, 2020, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import re
import sys
import subprocess
import os
import threading
import zipfile
import tempfile
import shlex
from workbench.utils import get_exe_path
# import the wb module
from wb import DefineModule, wbinputs
# import the grt module
import grt
import mforms
from grt import log_warning
from workbench.log import log_info, log_error, log_debug2
import traceback
from workbench.ui import WizardForm, WizardPage
from mforms import newButton, newCheckBox
# define this Python module as a GRT module
ModuleInfo = DefineModule(name= "PyWbUtils", author= "Sun Microsystems Inc.", version="1.0")
# wb_model_utils contains a few more plugins that will get registered when imported
import wb_model_utils # noqa
import wb_catalog_utils # noqa
def get_linux_terminal_program():
paths = os.getenv("PATH").split(":")
if not paths:
paths = ['/usr/bin', '/usr/local/bin', '/bin']
for term in ["gnome-terminal", "konsole", "xterm", "rxvt"]:
for d in paths:
full_path = os.path.join(d, term)
if os.path.exists(full_path):
return full_path
return None
@ModuleInfo.plugin('wb.tools.backupConnections', caption='Backup existing connections', accessibilityName="Backup Existing Connections")
@ModuleInfo.export(grt.INT)
def backupConnections():
user_data_dir = mforms.App.get().get_user_data_folder()
connections_path = os.path.join(user_data_dir, 'connections.xml')
instances_path = os.path.join(user_data_dir, 'server_instances.xml')
file_chooser = mforms.newFileChooser(mforms.Form.main_form(), mforms.SaveFile)
file_chooser.set_title('Export Connections As')
file_chooser.set_extensions('ZIP Files (*.zip)|*.zip', 'import')
if file_chooser.run_modal() == mforms.ResultOk:
backup_path = file_chooser.get_path()
try:
backup_file = zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED)
except Exception:
mforms.Utilities.show_error('Backup file creation error',
'Could not create the backup file. Please check path and permissions and try '
'again.', 'OK', '', '')
return 1
backup_file.write(connections_path, 'connections.xml')
backup_file.write(instances_path, 'server_instances.xml')
if mforms.Utilities.show_message('Connections saved', 'Your connections were successfully backed up to '
+ backup_path,
'OK', '', 'Show File') == mforms.ResultOther:
mforms.Utilities.reveal_file(backup_path)
return 0
@ModuleInfo.plugin('wb.tools.restoreConnections', caption='Restore connections from a backup file', accessibilityName="Restore Connections")
@ModuleInfo.export(grt.INT)
def restoreConnections():
def generate_unique_name(name, name_set):
new_name = name
idx = 1
while True:
if not new_name in name_set:
return new_name
new_name = name + ' (%d)' % idx
idx += 1
file_chooser = mforms.newFileChooser(mforms.Form.main_form(), mforms.OpenFile)
file_chooser.set_title('Select a Connections Backup File')
file_chooser.set_extensions('ZIP Files (*.zip)|*.zip', 'import')
if file_chooser.run_modal():
backup_path = file_chooser.get_path()
try:
backup_file = zipfile.ZipFile(backup_path, 'r')
try:
instances_file = tempfile.NamedTemporaryFile(delete=False)
instances_file.write(backup_file.read('server_instances.xml'))
instances_file.close()
connections_file = tempfile.NamedTemporaryFile(delete=False)
connections_file.write(backup_file.read('connections.xml'))
connections_file.close()
except KeyError as error:
mforms.Utilities.show_error('Restore Connections Error', 'The selected file is not a valid backup file '
'or the file is corrupted: %s.' % str(error),
'OK', '', '')
grt.log_error('restoreConnections', 'The selected file is not a valid backup file '
'or the file is corrupted: %s.' % str(error))
return
connections = grt.unserialize(connections_file.name)
if not isinstance(connections, grt.List):
mforms.Utilities.show_error('Restore Connections Error', 'The selected file is not a valid backup file '
'or the file is corrupted.',
'OK', '', '')
grt.log_error('restoreConnections', 'The selected archive does not have a valid connection backup file.\n')
return
inserted_connections = {}
existent_connection_names = set(conn.name for conn in grt.root.wb.rdbmsMgmt.storedConns)
existent_connection_ids = set(conn.__id__ for conn in grt.root.wb.rdbmsMgmt.storedConns)
duplicate_connection_count = 0
for candidate_connection in connections:
if candidate_connection.__id__ in existent_connection_ids:
duplicate_connection_count = duplicate_connection_count + 1
continue
candidate_connection.name = generate_unique_name(candidate_connection.name, existent_connection_names)
existent_connection_names.add(candidate_connection.name)
candidate_connection.owner = grt.root.wb.rdbmsMgmt
inserted_connections[candidate_connection.__id__] = candidate_connection
grt.root.wb.rdbmsMgmt.storedConns.append(candidate_connection)
instances = grt.unserialize(instances_file.name)
if not isinstance(instances, grt.List):
mforms.Utilities.show_error('Restore Connections Error', 'The selected file is not a valid backup file '
'or the file is corrupted.',
'OK', '', '')
grt.log_error('restoreConnections', 'Workbench restored %i valid connections but server configuration data could not be found or is not valid.\n' % len(connections))
return
existent_instance_names = set(instance.name for instance in grt.root.wb.rdbmsMgmt.storedInstances)
previous_instances_conns = set()
duplicated_instance_count = 0
for candidate_instance in instances:
if candidate_instance.connection is None:
continue
if candidate_instance.connection.__id__ in previous_instances_conns:
duplicated_instance_count = duplicated_instance_count + 1
continue # Skip instances whose connections are associated to previously processed instances
previous_instances_conns.add(candidate_instance.connection.__id__)
candidate_instance.name = generate_unique_name(candidate_instance.name, existent_instance_names)
existent_instance_names.add(candidate_instance.name)
new_conn = inserted_connections.get(candidate_instance.connection.__id__, None)
candidate_instance = candidate_instance.shallow_copy()
candidate_instance.connection = new_conn
grt.root.wb.rdbmsMgmt.storedInstances.append(candidate_instance)
grt.modules.Workbench.refreshHomeConnections()
grt.modules.Workbench.saveConnections()
grt.modules.Workbench.saveInstances()
if duplicate_connection_count > 0 or duplicated_instance_count > 0:
message = []
message.append('Workbench detected ')
if duplicate_connection_count > 0:
message.append('%i duplicated connections' % duplicate_connection_count)
if duplicated_instance_count > 0:
if duplicate_connection_count > 0:
message.append(' and ')
message.append('%i duplicated instances' % duplicated_instance_count)
message.append(', which were not restored.')
mforms.Utilities.show_warning('Restore Connections', ''.join(message), 'OK', '', '')
except zipfile.BadZipfile as error:
mforms.Utilities.show_error('Restore Connections Error', 'The selected file is not a valid backup file '
'or the file is corrupted.',
'OK', '', '')
grt.log_error('restoreConnections', 'The selected file is not a valid backup file or the file is corrupted: %s\n' % error)
except IOError as error:
mforms.Utilities.show_error('Restore Connections Error', 'Cannot read from file. Please check this file '
'permissions and try again.',
'OK', '', '')
grt.log_error('restoreConnections', '%s\n' % str(error))
return 0
@ModuleInfo.export(grt.STRING, grt.classes.db_mgmt_Connection)
def connectionStringFromConnection(conn):
#<user>[:<password>]@<host>[:<port>][:<socket>]
connstr = ""
if conn.driver.name == "MysqlNative":
connstr = "%s@%s:%s" % (conn.parameterValues["userName"], conn.parameterValues["hostName"], conn.parameterValues["port"])
elif conn.driver.name == "MysqlNativeSocket":
connstr = "%s@%s::%s" % (conn.parameterValues["userName"], conn.parameterValues["hostName"], conn.parameterValues["socket"])
elif conn.driver.name == "MysqlNativeSSH":
#XXX this is incomplete, need some way to encode the ssh params
connstr = "%s@%s::%s" % (conn.parameterValues["userName"], conn.parameterValues["hostName"], conn.parameterValues["port"])
return connstr
@ModuleInfo.export(grt.classes.db_mgmt_Connection, grt.STRING)
def connectionFromString(connstr):
valid = False
def get_driver(name):
for d in grt.root.wb.rdbmsMgmt.rdbms[0].drivers:
if d.name == name:
return d
return None
# parse as a one of our connection strings
g = re.match("(.*?)(?::(.*))?@(.*?)(?::([0-9]+|)(?::(.+|))?)?$", connstr)
if g:
user, password, host, port, socket = g.groups()
valid = True
else:
user, password, host, port, socket = None, None, None, None, None
# check if this is a mysql cmdline client command
tokens = shlex.split(connstr.strip())
if tokens:
if tokens[0].endswith("mysql") or tokens[0].endswith("mysql.exe"):
i = 1
valid = True
while i < len(tokens):
if tokens[i] == "-u":
i += 1
user = tokens[i]
elif tokens[i].startswith("-u"):
user = tokens[i][2:]
elif tokens[i] == "-h":
i += 1
host = tokens[i]
elif tokens[i].startswith("-h"):
host = tokens[i][2:]
elif tokens[i] == "-p":
i += 1
password = tokens[i]
elif tokens[i].startswith("-p"):
password = tokens[i][2:] # noqa
elif tokens[i] == "-P":
i += 1
port = tokens[i]
elif tokens[i].startswith("-P"):
port = tokens[i][2:]
elif tokens[i] == "-S":
i += 1
socket = tokens[i]
elif tokens[i].startswith("-S"):
socket = tokens[i][2:]
i += 1
if valid:
if port:
try:
port = int(port)
except:
log_warning("wb_utils", "Error parsing connstring; port value '%s' should be a number\n" % port)
port = None
if not port:
port = 3306
conn = grt.classes.db_mgmt_Connection()
conn.owner = grt.root.wb.rdbmsMgmt
conn.name = connstr
if socket:
conn.driver = get_driver("MysqlNativeSocket")
else:
conn.driver = get_driver("MysqlNative")
if user:
conn.parameterValues["userName"] = user
if host:
conn.parameterValues["hostName"] = host
if port:
conn.parameterValues["port"] = port
if socket:
conn.parameterValues["socket"] = socket
hostIdentifier = conn.driver.hostIdentifierTemplate
for key, value in list(conn.parameterValues.items()):
hostIdentifier = hostIdentifier.replace("%"+key+"%", str(value))
conn.hostIdentifier = hostIdentifier
return conn
return None
@ModuleInfo.plugin("wb.tools.copyConnectionString", caption="Copy Connection String to Clipboard", input= [wbinputs.selectedConnection()], pluginMenu="Home/Connections", accessibilityName="Copy Connection String to Clipboard")
@ModuleInfo.export(grt.INT, grt.classes.db_mgmt_Connection)
def copyConnectionString(conn):
connstr = connectionStringFromConnection(conn)
mforms.Utilities.set_clipboard_text(connstr)
@ModuleInfo.plugin("wb.tools.copyJDBCConnectionString", caption="Copy JDBC Connection String to Clipboard", input= [wbinputs.selectedConnection()], pluginMenu="Home/Connections", accessibilityName="Copy JBDC String to Clipboard")
@ModuleInfo.export(grt.INT, grt.classes.db_mgmt_Connection)
def copyJDBCConnectionString(conn):
if "schema" in conn.parameterValues:
params = "/"+conn.parameterValues["schema"]
else:
params = "/"
params += "?user=%s" % conn.parameterValues["userName"]
if conn.driver.name == "MysqlNative":
connstr = "jdbc:mysql://%s:%s" % (conn.parameterValues["hostName"], conn.parameterValues["port"])
elif conn.driver.name == "MysqlNativeSocket":
connstr = "jdbc:mysql://%s:%s" % (conn.parameterValues["hostName"], conn.parameterValues["socket"])
elif conn.driver.name == "MysqlNativeSSH":
mforms.Utilities.show_error("Copy JDBC Connection String",
"Cannot create JDBC connection string for %s. The connection uses a SSH tunnel." % conn.name,
"OK", "", "")
return
mforms.Utilities.set_clipboard_text(connstr+params)
@ModuleInfo.plugin("wb.tools.createMissingLocalConnections", caption="Rescan for Local MySQL Instances", input= [], pluginMenu="Home/Connections", accessibilityName="Rescan MySQL Instances")
@ModuleInfo.export(grt.INT)
def createMissingLocalConnections():
found_instances = grt.modules.Workbench.createInstancesFromLocalServers()
grt.modules.Workbench.refreshHomeConnections()
if found_instances < 0:
mforms.Utilities.show_error("Rescan for Local MySQL Servers", "Rescan for local MySQL servers failed", "OK", "", "")
elif found_instances == 0:
mforms.Utilities.show_message('Rescan for Local MySQL Servers', 'No servers were found.', 'OK', '', '')
else:
mforms.Utilities.show_message('Rescan for Local MySQL Servers', 'Found %s servers.' % found_instances, 'OK', '', '')
return 1
@ModuleInfo.plugin("wb.tools.connectionFromClipboard", caption="Add Connection(s) from Clipboard", input= [], pluginMenu="Home/Connections", accessibilityName="Add Connections From Clipboard")
@ModuleInfo.export(grt.INT)
def newConnectionFromClipboard():
text = mforms.Utilities.get_clipboard_text()
if not text:
return 0
existing = set()
# make a normalized set of the connections that already exist
for con in grt.root.wb.rdbmsMgmt.storedConns:
existing.add(connectionStringFromConnection(con))
parse_errors = False
found_instances = 0
for line in text.split("\n"):
conn = connectionFromString(line)
if not conn and not parse_errors:
mforms.Utilities.show_error("Add Connection(s) from Clipboard", "Could not parse connection parameters from string '%s'" % line, "OK", "", "")
parse_errors = True
continue
if connectionStringFromConnection(conn) in existing:
mforms.Utilities.show_error("Add Connection(s) from Clipboard", "The connection %s already exists and was not added." % line,
"OK", "", "")
continue
i = 1
name = conn.parameterValues.get("hostName", "local")
prefix = name
while any(conn.name == name for conn in grt.root.wb.rdbmsMgmt.storedConns):
name = "%s (%i)" % (prefix, i)
i += 1
conn.name = name
log_info("Added connection %s from clipboard\n" % conn.name)
found_instances = found_instances + 1
grt.root.wb.rdbmsMgmt.storedConns.append(conn)
grt.modules.Workbench.refreshHomeConnections()
if found_instances > 0:
mforms.Utilities.show_message('Add Connection(s) from Clipboard', 'Found %s servers.' % found_instances, 'OK', '', '')
return 1
@ModuleInfo.plugin("wb.tools.cmdlineClient", caption="Start Command Line Client", input= [wbinputs.selectedConnection()], pluginMenu="Home/Connections", accessibilityName="Command Line Client")
@ModuleInfo.export(grt.INT, grt.classes.db_mgmt_Connection)
def startCommandLineClientForConnection(conn):
import platform
import os
if "ssh" in conn.driver.name.lower():
host = "127.0.0.1"
tun = grt.modules.DbMySQLQuery.openTunnel(conn)
if tun < 0:
mforms.Utilities.show_error("Start Command Line Client", "Could not open SSH tunnel to host.", "OK", "", "")
return
port = grt.modules.DbMySQLQuery.getTunnelPort(tun)
socket = ""
elif "socket" in conn.driver.name.lower():
if platform.system() == "Windows":
host = "."
else:
host = "localhost"
port = None
socketName = conn.parameterValues["socket"]
if socketName is None:
socketName = "MySQL"
socket = "--socket=" + socketName
else:
host = conn.parameterValues["hostName"].replace("\\", "\\\\").replace('"', '\\"')
port = conn.parameterValues["port"]
socket = ""
user = conn.parameterValues["userName"].replace("\\", "\\\\").replace('"', '\\"')
if port is None:
port = 3306
schema = conn.parameterValues["schema"]
if schema:
schema = schema.replace("\\", "\\\\").replace('"', '\\"')
else:
schema = ""
bundled_client_path = grt.root.wb.options.options.get("mysqlclient", None)
if platform.system().lower() == "darwin":
if not bundled_client_path:
bundled_client_path = mforms.App.get().get_executable_path("mysql")
command = """\\"%s\\" \\"-u%s\\" \\"-h%s\\" -P%i %s -p %s""" % (os.path.expanduser(bundled_client_path), user, host, port, socket, schema)
os.system("""osascript -e 'tell application "Terminal" to do script "%s"'""" % command)
elif platform.system().lower() == "windows":
if not bundled_client_path:
bundled_client_path = mforms.App.get().get_executable_path("mysql.exe")
# call mysql client, and if it exits with error, pause so that the user can see what went wrong, before closing the window
# (the ideal way would have been to do what we do for Linux, but Windows shell is too limited)
command = """start cmd /C "%s -u%s -h%s -P%i %s -p %s || pause" """ % (bundled_client_path.replace(" ", "\\ "), user, host, port, socket, schema)
subprocess.Popen(command, shell = True)
else:
if not bundled_client_path:
bundled_client_path = mforms.App.get().get_executable_path("mysql")
if not bundled_client_path:
bundled_client_path = "mysql"
command = """\\"%s\\" \\"-u%s\\" \\"-h%s\\" -P%i %s -p %s""" % (bundled_client_path, user, host, port, socket, schema)
# call mysql client in a loop until either: 1. it exits with no error, or 2. user exits with Ctrl+C.
# This is necessary because if the user enters wrong password, the window closes too quick for the user to see what's going on.
my_env = os.environ.copy()
if (("XDG_SESSION_TYPE" in my_env and my_env["XDG_SESSION_TYPE"] == "wayland")
or "WAYLAND_DISPLAY" in my_env) and my_env["GDK_BACKEND"]:
my_env["GDK_BACKEND"] = "wayland"
subprocess.Popen(["/bin/sh", "-c",
get_linux_terminal_program() + " -e '" +
"sh -c \"while :; do %s && break || read -p \\\"Press Enter to retry or Ctrl+C to quit\\\" DUMMY_VAR; done\" " % command
+ "' &" # <--- launch GUI terminal and exit (returns to Workbench immediately rather than blocking)
], shell=False, env=my_env)
@ModuleInfo.plugin("wb.tools.cmdlineClient", caption="Start Command Line Client", input=[wbinputs.selectedConnection()], pluginMenu="Home/Connections", accessibilityName="Command Line Client")
@ModuleInfo.export(grt.INT)
def startODBCAdmin():
if sys.platform == "linux2":
path = get_exe_path('iodbcadm-gtk')
if not path:
path = get_exe_path('ODBCManageDataSourcesQ4')
if path:
subprocess.Popen(path, shell=True, close_fds=True)
return 1
else:
return 0
elif sys.platform == "darwin":
ret = subprocess.call("open -a 'ODBC Administrator'", shell=True)
if ret == 1:
ret = subprocess.call("open -a 'ODBC Manager'", shell=True)
return 0 if ret == 1 else 1
elif sys.platform == "win32":
# WTF alert:
# In 64bit windows, there are 2 versions of odbcad32.exe. One is 64bits and the other 32.
# The 64bit version is in \Windows\System32
# The 32bit version is in \Windows\SysWOW64
# so if we're a 64bit WB, then we run the 64bit odbc tool (since we can't use 32bit drivers anyway)
if sys.maxsize > 2**31:
subprocess.Popen(r"%SYSTEMROOT%\SysWOW64\odbcad32.exe", shell=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, close_fds=True)
else:
subprocess.Popen(r"%SYSTEMROOT%\System32\odbcad32.exe", shell=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, close_fds=True)
return 1
def process_not_found_utils():
utilities_url = ("http://dev.mysql.com/downloads/utilities/" if grt.root.wb.info.edition == "Community" else
"https://edelivery.oracle.com/EPD/Search/get_form?product=18251")
source_description = "www.mysql.com" if grt.root.wb.info.edition == "Community" else "eDelivery"
if mforms.Utilities.show_message("MySQL Utilities", "The command line MySQL Utilities could not be "
"located.\n\nTo use them, you must download and install the utilities "
"package for your system from %s.\n\n"
"Click on the Download button to proceed." % source_description,
"Download...", "Cancel", "") == mforms.ResultOk:
mforms.Utilities.open_url(utilities_url)
@ModuleInfo.plugin("wb.tools.utilitiesShell", caption="Start Shell for MySQL Utilities", groups=["Others/Menu/Ungrouped"], accessibilityName="MySQL Utilities Shell")
@ModuleInfo.export(grt.INT)
def startUtilitiesShell():
import platform
import os
if platform.system() == "Windows":
guessed_path = None
for varname in ["ProgramFiles(x86)", "ProgramFiles"]:
if not os.getenv(varname):
continue
path = os.path.join(os.getenv(varname), "MySQL", "MySQL Utilities", "mysqluc.exe")
if os.path.exists(path):
guessed_path = path
break
if any(os.path.exists(os.path.join(f, "mysqluc.exe")) for f in os.getenv("PATH").split(";")):
# Utils path is in PATH already
command = r'start cmd /K "mysqluc"'
subprocess.Popen(command, shell = True)
elif guessed_path:
command = r'start cmd /K "%s"' % guessed_path
subprocess.Popen(command, shell = True)
else:
process_not_found_utils()
elif platform.system() == "Darwin":
# PATH seems to be stripped down when WB is started from a binary .app
if any(os.path.exists(f+"/mysqluc") for f in os.getenv("PATH").split(":") + ["/usr/local/bin"]):
os.system(r"""osascript -e 'tell application "Terminal" to do script "mysqluc -e \"help utilities\""' -e 'tell front window of application "Terminal" to set custom title to "MySQL Utilities"'""")
else:
process_not_found_utils()
else:
if not any(os.path.exists(f+"/mysqluc") for f in os.getenv("PATH").split(":")):
process_not_found_utils()
else:
term = get_linux_terminal_program()
if term:
import tempfile
fd, setup_script = tempfile.mkstemp(prefix="delme.", dir=mforms.App.get().get_user_data_folder())
f = os.fdopen(fd, "w+")
f.write('echo "The following MySQL Utilities are available:"\n')
f.write('mysqluc -e "help utilities"\n')
f.write('rm -f "%s"\n' % setup_script)
f.write('sh -i\n')
f.close()
os.chmod(setup_script, 0o700)
if 'konsole' in term:
subprocess.call([term, "-e", "/bin/sh", setup_script])
else:
my_env = os.environ.copy()
if (("XDG_SESSION_TYPE" in my_env and my_env["XDG_SESSION_TYPE"] == "wayland")
or "WAYLAND_DISPLAY" in my_env) and my_env["GDK_BACKEND"]:
my_env["GDK_BACKEND"] = "wayland"
subprocess.Popen(["/bin/sh", "-c", "%s -e %s &" % (term, setup_script)], shell=False, env=my_env)
else:
raise RuntimeError("Terminal program could not be found")
class CheckForUpdateThread(threading.Thread):
def __init__(self):
self.is_running = False
self.finished = False
super(CheckForUpdateThread, self).__init__()
def run(self):
if self.is_running:
return
self.is_running = True
try:
import urllib.request, urllib.error, urllib.parse
import json
import base64
class LoginForm(mforms.Form):
def __init__(self):
mforms.Form.__init__(self, None)
self.set_title("Apply Changes to MySQL configuration File")
content = mforms.newBox(False)
content.set_padding(12)
content.set_spacing(12)
button_box = mforms.newBox(True)
button_box.set_spacing(12)
self.apply_btn = mforms.newButton()
self.apply_btn.set_text("Apply")
self.cancel_btn = mforms.newButton()
self.cancel_btn.set_text("Cancel")
self.cancel_btn.add_clicked_callback(self.cancel_clicked)
content.add(button_box, False, True)
self.set_content(content)
self.set_size(640, 480)
self.apply_btn.add_clicked_callback(self.apply_clicked)
self.cancel_btn.add_clicked_callback(self.cancel_clicked)
def show(self):
self.run_modal(self.apply_btn, self.cancel_btn)
def apply_clicked(self):
self.close()
def cancel_clicked(self):
self.close()
class ProxyAuthenticationHandler(urllib.request.AbstractBasicAuthHandler, urllib.request.BaseHandler):
auth_header = 'Proxy-authorization'
attempts = 0
class ProxyAuthenticationForm(mforms.Form):
def __init__(self):
mforms.Form.__init__(self, None, mforms.FormDialogFrame)
self.set_size(400, -1)
box = mforms.newBox(False)
box.set_spacing(12)
box.set_padding(12)
self.set_content(box)
self.set_title("Proxy Authentication")
content = mforms.newTable()
content.set_padding(12);
content.set_row_count(2);
content.set_row_spacing(10);
content.set_column_count(2);
content.set_column_spacing(10);
self.username = mforms.newTextEntry()
self.password = mforms.newTextEntry(mforms.PasswordEntry)
content.add(mforms.newLabel("User Name:"), 0, 1, 0, 1, mforms.HFillFlag | mforms.VFillFlag);
content.add(self.username, 1, 2, 0, 1, mforms.HFillFlag | mforms.HExpandFlag);
content.add(mforms.newLabel("Password:"), 0, 1, 1, 2, mforms.HFillFlag | mforms.VFillFlag);
content.add(self.password, 1, 2, 1, 2, mforms.HFillFlag | mforms.HExpandFlag);
box.add(content, True, True)
self.ok = mforms.newButton()
self.ok.set_text("OK")
self.cancel = mforms.newButton()
self.cancel.set_text("Cancel")
button_box = mforms.newBox(True)
mforms.Utilities.add_end_ok_cancel_buttons(button_box, self.ok, self.cancel)
box.add_end(button_box, False, True)
self.ok.add_clicked_callback(self.accepted)
self.cancel.add_clicked_callback(self.canceled)
def run(self):
return self.run_modal(None, self.cancel)
def accepted(self):
self.end_modal(True)
def canceled(self):
self.end_modal(False)
def request_authentication(self):
dialog = self.ProxyAuthenticationForm()
self.result = dialog.run()
self.username = dialog.username.get_string_value()
self.password = dialog.password.get_string_value()
def http_error_407(self, req, fp, code, msg, headers):
authority = req.host
if self.attempts > 0:
mforms.Utilities.show_error('Proxy Authentication', 'The proxy authentication was incorrect. Please try again.', "OK", "", "")
mforms.Utilities.perform_from_main_thread(self.request_authentication, True)
if self.result == False:
return None
self.attempts = self.attempts + 1
raw = "%s:%s" % (self.username, self.password)
auth = "Basic " + base64.b64encode(raw.encode()).decode("ascii")
if req.get_header(self.auth_header, None) == auth:
return None
req.add_unredirected_header(self.auth_header, auth)
return self.parent.open(req, timeout=req.timeout)
proxy_handler = urllib.request.ProxyHandler()
proxy_auth_handler = ProxyAuthenticationHandler()
opener = urllib.request.build_opener()
opener.add_handler(proxy_handler)
opener.add_handler(proxy_auth_handler)
urllib.request.install_opener(opener)
self.json = json.load(urllib.request.urlopen("http://workbench.mysql.com/current-release"))
except Exception as error:
self.json = None
self.error = "%s\n\nPlease verify that your internet connection is available." % str(error)
def checkForUpdatesCallback(self):
if self.is_alive():
return True # Don't do anything until the dom is built
if not self.json:
if hasattr(self, 'error'):
mforms.Utilities.show_error("Check for updates failed", str(self.error), "OK", "", "")
else:
try:
current_version = (grt.root.wb.info.version.majorNumber, grt.root.wb.info.version.minorNumber, grt.root.wb.info.version.releaseNumber)
newest_version = tuple(int(i) for i in self.json['fullversion'].split("."))
if newest_version > current_version:
if mforms.Utilities.show_message('New Version Available', 'The new MySQL Workbench %s has been released.\nYou can download the latest version from\nhttp://www.mysql.com/downloads/workbench.' % '.'.join( [str(num) for num in newest_version] ),
'Get it Now', 'Maybe Later', "") == mforms.ResultOk:
mforms.Utilities.open_url('http://www.mysql.com/downloads/workbench')
else:
mforms.Utilities.show_message('MySQL Workbench is Up to Date', 'You are already using the latest version of MySQL Workbench.', 'OK', '', '')
except Exception as error:
mforms.Utilities.show_error("Check for updates failed", str(error), "OK", "", "")
mforms.App.get().set_status_text('Ready.')
self.is_running = False
self.finished = True
return False
# Global variable:
thread = CheckForUpdateThread()
@ModuleInfo.plugin("wb.tools.checkForUpdates", caption="Check for Updates", accessibilityName="Check for Updates")
@ModuleInfo.export(grt.INT)
def checkForUpdates():
global thread
if thread.is_running:
return 0
if thread.finished:
thread = CheckForUpdateThread()
thread.start()
mforms.App.get().set_status_text('Checking for updates...')
ignore = mforms.Utilities.add_timeout(1.0, thread.checkForUpdatesCallback) # noqa
class SSLWizard_GenerationTask:
def __init__(self, main, path):
self.main = main
self.path = path
self.config_file = {}
def display_error(self, title, message):
log_error("%s\n%s\n" % (title, message))
mforms.Utilities.show_error(title, message, "OK", "", "")
def verify_preconditions(self):
try:
if not os.path.exists(self.main.certificates_root) or not os.path.isdir(self.main.certificates_root):
log_info("Creating certificates toor directory[%s]" % self.main.certificates_root)
os.mkdir(self.main.certificates_root, 0o700)
if os.path.exists(self.path) and not os.path.isdir(self.path):
self.display_error("Checking requirements", "The selected path is a file. You should select a directory.")
return False
if not os.path.exists(self.path):
if mforms.Utilities.show_message("Create directory", "The directory you selected does not exists. Do you want to create it?", "Create", "Cancel", "") == mforms.ResultCancel:
self.display_error("Create directory", "The operation was canceled.")
return False
os.mkdir(self.path, 0o700)
return True
except OSError as e:
self.display_error("Create directory", "There was an error (%d) - %s\n%s" % (e.errno, str(e), str(traceback.format_exc())))
if e.errno == 17:
return True
#raise
return False
def generate_config_file(self, target):
self.config_file[target] = os.path.join(self.path, "attribs-%s.txt" % target)
f = open(self.config_file[target], "w+")
f.write("[req]\ndistinguished_name=distinguished_name\nprompt=no\n")
f.write("\n".join(["[distinguished_name]"] + self.main.generate_page.get_attributes(target))+"\n")
f.close()
def run_command(self, command, output_to = subprocess.PIPE):
try:
set_shell = True if sys.platform == "win32" else False
p = subprocess.Popen(command, stdout=output_to, stderr=subprocess.PIPE, shell=set_shell)
out = p.communicate()
if p.returncode != 0:
log_error("Running command: %s\nOutput(retcode: %d):\n%s\n" % (str(command), p.returncode, str(out)))
return False
return True
except ValueError as e:
log_error("Running command: %s\nValueError exception\n" % (str(e.cmd)))
return False
except OSError as e:
log_error("Running command: %s\nException:\n%s\n" % (str(command), str(e)))
return False
def generate(self, path, config_file):
days = 3600
tool = "openssl"
ca_key = os.path.join(path, "ca-key.pem")
ca_cert = os.path.join(path, "ca-cert.pem")
# Check if the tool exists
log_debug2("Checking tool availability(%s)\n" % tool)
if not self.run_command([tool, "version"]):
self.display_error("Checking requirements", "The SSL tool (%s) is not available. Please verify if it's installed and the installation directory is in the PATH environment variable" % tool)
return False, None, None, None, None, None
# Check if path exists
if not os.path.exists(self.path):
self.display_error("Checking requirements", "The specified directory does not exist.")
return False, None, None, None, None, None
server_key = os.path.join(path, "server-key.pem")
server_req = os.path.join(path, "server-req.pem")
server_cert = os.path.join(path, "server-cert.pem")
client_key = os.path.join(path, "client-key.pem")
client_req = os.path.join(path, "client-req.pem")
client_cert = os.path.join(path, "client-cert.pem")
client_p12 = os.path.join(path, "client.p12")
private_key = os.path.join(path, "private_key.pem")
key_store = os.path.join(path, "test-cert-store")
params = {
'keylen': 2048,
'days': 3650,
'ca_key': ca_key,
'ca_cert': ca_cert,
'server_key': server_key,
'server_req': server_req,
'server_cert': server_cert,
'client_key': client_key,
'client_req': client_req,
'client_cert': client_cert,
'client_p12': client_p12,
'key_store': key_store,
'private_key': private_key,
"config_CA": self.config_file["CA"],
"config_server": self.config_file["Server"],
"config_client": self.config_file["Client"]
}
commands = [
#Creating CA key
'openssl genrsa -out "%(ca_key)s" %(keylen)s' % params,
# Creating CA Certificate using the authority key
'openssl req -new -x509 -nodes -days %(days)s -key "%(ca_key)s" -out "%(ca_cert)s" -config "%(config_CA)s"' % params,
# Create server key and certificate sign request
'openssl req -newkey rsa:%(keylen)s -nodes -keyout "%(server_key)s" -out "%(server_req)s" -config "%(config_server)s"' % params,
# Encrypt the server private key
'openssl rsa -in "%(server_key)s" -out "%(server_key)s"' % params,
# Generate self-signed certificate (using the key+csr)
'openssl x509 -req -in "%(server_req)s" -days %(days)s -CA "%(ca_cert)s" -CAkey "%(ca_key)s" -set_serial 01 -out "%(server_cert)s" -extensions v3_req' % params,
# Create client key and certificate sign request
'openssl req -newkey rsa:%(keylen)s -nodes -keyout "%(client_key)s" -out "%(client_req)s" -config "%(config_client)s"' % params,
# Encrypt the client private key
'openssl rsa -in "%(client_key)s" -out "%(client_key)s"' % params,
# Generate self-signed certificate (using the key+csr)
'openssl x509 -req -in "%(client_req)s" -days %(days)s -CA "%(ca_cert)s" -CAkey "%(ca_key)s" -set_serial 01 -out "%(client_cert)s"' % params,
]
for command in commands:
self.run_command(shlex.split(command))
return True, ca_cert, server_cert, server_key, client_cert, client_key
def run(self):
self.result = False
if not self.verify_preconditions():
return False
self.generate_config_file("CA")
self.generate_config_file("Server")
self.generate_config_file("Client")
self.result, self.ca_cert, self.server_cert, self.server_key, self.client_cert, self.client_key = self.generate(self.path, self.config_file)
return True
class SSLWizard_IntroPage(WizardPage):
def __init__(self, owner):
WizardPage.__init__(self, owner, "Welcome to MySQL Workbench SSL Wizard")
def go_cancel(self):
self.main.finish()
def create_ui(self):
box = mforms.newBox(False)
box.set_padding(20)
box.set_spacing(20)
message = "This wizard will assist you to generate a set of SSL certificates and self-signed keys that are required \n"
message += "by the MySQL server to enable SSL. Other files will also be generated so that you can check how to \n"
message += "configure your server and clients as well as the attributes used to generate them."
label = mforms.newLabel(message)
box.add(label, False, True)
self.content.add(box, False, True)
box.show(True)
class SSLWizard_OptionsPage(WizardPage):
def __init__(self, owner):
WizardPage.__init__(self, owner, "Options")
self.generate_files = newCheckBox()
self.generate_files.set_text("Generate new certificates and self-signed keys");
self.generate_files.set_active(not self.check_all_files_availability())
self.generate_files.set_enabled(self.check_all_files_availability())
self.update_connection = newCheckBox()
self.update_connection.set_text("Update the connection");
self.update_connection.set_active(True)
self.use_default_parameters = newCheckBox()
self.use_default_parameters.set_text("Use default parameters");
self.use_default_parameters.set_active(False)
self.clear_button = newButton()
self.clear_button.set_text("Clear Files")
self.clear_button.add_clicked_callback(self.clear_button_clicked)
self.clear_button.set_enabled(os.path.isdir(self.main.results_path))
def go_cancel(self):
self.main.finish()
def check_all_files_availability(self):
if not os.path.isdir(self.main.results_path):
return False
if not os.path.isfile(os.path.join(self.main.results_path, "ca-cert.pem")):
return False
if not os.path.isfile(os.path.join(self.main.results_path, "client-cert.pem")):
return False
if not os.path.isfile(os.path.join(self.main.results_path, "client-key.pem")):
return False
return True
def clear_button_clicked(self):
for filename in os.listdir(self.main.results_path):
filepath = os.path.join(self.main.results_path, filename)
try:
if os.path.isfile(filepath):
os.unlink(filepath)
except Exception as e:
log_error("SSL Wizard: Unable to remove file %s\n%s" % (filepath, str(e)))
return
self.generate_files.set_active(True)
self.generate_files.set_enabled(False)
self.main.generate_files_changed()
def create_ui(self):
box = mforms.newBox(False)
box.set_spacing(12)
box.set_padding(12)
message = "These options allow you to configure the process. You can use default parameters\n"
message += "instead of providing your own, allow the generation of the certificates and determine\n"
message += "whether to update the connection settings or not."
label = mforms.newLabel(message)
box.add(label, False, True)
box.add(self.use_default_parameters, False, True)
box.add(self.generate_files, False, True)
box.add(self.update_connection, False, True)
button_box = mforms.newBox(True)
button_box.set_spacing(12)
button_box.set_padding(12)
button_box.add(self.clear_button, False, True)
self.content.add(box, False, True)
self.content.add(button_box, False, True)
box.show(True)
class SSLWizard_GeneratePage(WizardPage):
def __init__(self, owner):
WizardPage.__init__(self, owner, "Generate certificates and self-signed keys")
self.ca_cert = os.path.join(self.main.results_path, "ca-cert.pem").replace('\\', '/')
self.server_cert = os.path.join(self.main.results_path, "server-cert.pem").replace('\\', '/')
self.server_key = os.path.join(self.main.results_path, "server-key.pem").replace('\\', '/')
self.client_cert = os.path.join(self.main.results_path, "client-cert.pem").replace('\\', '/')
self.client_key = os.path.join(self.main.results_path, "client-key.pem").replace('\\', '/')
self.table = mforms.newTable()
self.table.set_padding(12)
self.table.set_column_count(3)
self.table.set_row_count(7)
self.table.set_row_spacing(8)
self.table.set_column_spacing(4)
row, self.country_code = self.add_label_row(0, "Country:", "2 letter country code (eg, US)")
row, self.state_name = self.add_label_row(row, "State or Province:", "Full state or province name")
row, self.locality_name = self.add_label_row(row, "Locality:", "eg, city")
row, self.org_name = self.add_label_row(row, "Organization:", "eg, company")
row, self.org_unit = self.add_label_row(row, "Org. Unit:", "eg, section, department")
row, self.email_address = self.add_label_row(row, "Email Address:", "")
row, self.common_name = self.add_label_row(row, "Common:", "eg, put the FQDN of the server\nto allow server address validation")
message = "Now you must specify the parameters to use in the certificates and self-signed key generation.\n"
message += "This may include some data refering to youself and/or the company you work for. All fields are optional."
self.parameters_box = mforms.newBox(False)
self.parameters_box.set_padding(20)
self.parameters_box.set_spacing(20)
self.parameters_label = mforms.newLabel(message)
self.parameters_panel = mforms.newPanel(mforms.TitledBoxPanel)
self.parameters_panel.set_title("Optional Parameters")
self.parameters_panel.add(self.table)
self.parameters_box.add(self.parameters_label, False, True)
self.parameters_box.add(self.parameters_panel, False, True)
self.default_label = mforms.newLabel("The wizard is ready to generate the files for you. Click 'Next >' to generate \nthe certificates and self-signed key files...")
def add_label_row(self, row, label, help):
control = mforms.newTextEntry()
self.table.add(mforms.newLabel(label, True), 0, 1, row, row+1, mforms.HFillFlag)
self.table.add(control, 1, 2, row, row+1, mforms.HFillFlag|mforms.HExpandFlag)
l = mforms.newLabel(help)
l.set_style(mforms.SmallHelpTextStyle)
self.table.add(l, 2, 3, row, row+1, mforms.HFillFlag)
control.set_size(100, -1)
return row+1, control
def set_show_parameters(self, value):
self.parameters_box.show(bool(value))
self.default_label.show(not value)
def get_attributes(self, target):
def get_value_default(text_control, default_value):
current_value = text_control.get_string_value().encode('utf-8')
if not current_value:
return default_value
return current_value
default_cn = 'localhost'
if target == 'CA':
default_cn = 'issuer'
l = []
l.append("C=%s" % get_value_default(self.country_code, "US"))
l.append("ST=%s" % get_value_default(self.state_name, "California"))
l.append("L=%s" % get_value_default(self.locality_name, "Redwood Shores"))
l.append("O=%s" % get_value_default(self.org_name, "Oracle"))
l.append("OU=%s" % get_value_default(self.org_unit, "MySQL"))
l.append("CN=%s" % get_value_default(self.common_name, default_cn))
l.append("emailAddress=%s" % get_value_default(self.email_address, "any@localhost"))
return l
def create_ui(self):
self.content.add(self.parameters_box, False, True)
self.content.add(self.default_label, False, True)
def go_cancel(self):
self.main.finish()
def go_next(self):
log_debug2("Setting up in path %s\n" % self.main.results_path)
task = SSLWizard_GenerationTask(self.main, self.main.results_path)
task.run()
if task.result == False:
return
self.ca_cert = task.ca_cert
self.server_cert = task.server_cert
self.server_key = task.server_key
self.client_cert = task.client_cert
self.client_key = task.client_key
f = open(os.path.join(self.main.results_path, "my.cnf.sample"), "w+")
f.write("""# Copy this to your my.cnf file. Please change <directory> to the corresponding
# directory where the files were copied.
[client]
ssl-ca=%(ca_cert)s
ssl-cert=%(client_cert)s
ssl-key=%(client_key)s
[mysqld]
ssl-ca=%(ca_cert)s
ssl-cert=%(server_cert)s
ssl-key=%(server_key)s
""" % {"ca_cert" : os.path.join("<directory>", os.path.basename(self.ca_cert)).replace('\\', '/'),
"server_cert" : os.path.join("<directory>", os.path.basename(self.server_cert)).replace('\\', '/'),
"server_key" : os.path.join("<directory>", os.path.basename(self.server_key)).replace('\\', '/'),
"client_cert" : os.path.join("<directory>", os.path.basename(self.client_cert)).replace('\\', '/'),
"client_key" : os.path.join("<directory>", os.path.basename(self.client_key)).replace('\\', '/')
})
f.close()
log_debug2("SSL Wizard generation task result: %s\n" % str(task.result))
self.main.go_next_page()
class SSLWizard_ResultsPage(WizardPage):
def __init__(self, owner):
WizardPage.__init__(self, owner, "Results")
self.update_connection = True
def set_update_connection(self, value):
self.update_connection = value
def go_next(self):
if self.update_connection:
self.main.conn.parameterValues['sslCA'] = self.main.generate_page.ca_cert.replace('\\', '/')
self.main.conn.parameterValues['sslCert'] = self.main.generate_page.client_cert.replace('\\', '/')
self.main.conn.parameterValues['sslKey'] = self.main.generate_page.client_key.replace('\\', '/')
self.main.conn.parameterValues['useSSL'] = 4
self.main.go_next_page()
def create_ui(self):
message = "The wizard was successful. "
if self.update_connection:
message += "Click on the finish button to update the connection. "
message += "To setup the server, you should \ncopy the following files to a <directory> inside %s:\n\n" % self.main.conn.parameterValues['hostName']
message += " - %s\n" % str(os.path.join(self.main.results_path, "ca-cert.pem")).replace('\\', '/')
message += " - %s\n" % str(os.path.join(self.main.results_path, "server-cert.pem")).replace('\\', '/')
message += " - %s\n" % str(os.path.join(self.main.results_path, "server-key.pem")).replace('\\', '/')
message += "\n\nand edit the config file to use the following parameters:"
label = mforms.newLabel(message)
self.content.add(label, False, True)
f = open(os.path.join(self.main.results_path, "my.cnf.sample"), "r")
config_file = mforms.newTextBox(mforms.VerticalScrollBar)
config_file.set_value(f.read())
config_file.set_size(-1, 150)
self.content.add(config_file, False, True)
f.close()
label = mforms.newLabel("A copy of this file can be found in:\n%s" % str(os.path.join(self.main.results_path, "my.cnf.sample").replace('\\', '/')))
self.content.add(label, False, True)
return
class SSLWizard(WizardForm):
def __init__(self, parent, conn, conn_id):
WizardForm.__init__(self, parent)
self.conn = conn
self.conn_id = conn_id
self.certificates_root = os.path.join(mforms.App.get().get_user_data_folder(), "certificates")
self.results_path = os.path.join(self.certificates_root, self.conn_id)
self.set_title("SSL Wizard")
self.intro_page = SSLWizard_IntroPage(self)
self.add_page(self.intro_page)
self.options_page = SSLWizard_OptionsPage(self)
self.add_page(self.options_page)
self.generate_page = SSLWizard_GeneratePage(self)
self.add_page(self.generate_page)
self.results_page = SSLWizard_ResultsPage(self)
self.add_page(self.results_page)
# Set the default selection values
self.generate_page.set_show_parameters(not self.options_page.use_default_parameters.get_active())
self.results_page.set_update_connection(self.options_page.update_connection.get_active())
self.generate_page.skip_page(not self.options_page.generate_files.get_active())
# Setup up the callbacks for the options
self.options_page.use_default_parameters.add_clicked_callback(lambda: self.generate_page.set_show_parameters(not self.options_page.use_default_parameters.get_active()))
self.options_page.update_connection.add_clicked_callback(lambda: self.results_page.set_update_connection(self.options_page.update_connection.get_active()))
self.options_page.generate_files.add_clicked_callback(lambda: self.generate_files_changed())
def generate_files_changed(self):
self.generate_page.skip_page(not self.options_page.generate_files.get_active())
@ModuleInfo.export(grt.INT, mforms.Form, grt.classes.db_mgmt_Connection, grt.STRING)
def generateCertificates(parent, conn, conn_id):
try:
log_info("Running SSL Wizard\nParent: %s\nUser Folder: %s\nConn Parameters: %s\nConn ID: %s\n" % (str(parent), mforms.App.get().get_user_data_folder(), str(conn.parameterValues), conn_id))
p = mforms.fromgrt(parent)
log_info("Running SSL Wizard\n%s\n" % str(p))
r = SSLWizard(p, conn, conn_id)
r.run(True)
except Exception as e:
log_error("There was an exception running SSL Wizard.\n%s\n\n%s" % (str(e), traceback.format_exc()))