gui/backend/gui_plugin/users/UserManagement.py (212 lines of code) (raw):

# Copyright (c) 2020, 2025, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import re import json import mysqlsh from gui_plugin.core.Db import BackendDatabase, BackendTransaction from mysqlsh.plugin_manager import plugin_function # pylint: disable=no-name-in-module from . import backend import gui_plugin.core.Error as Error from gui_plugin.core.Error import MSGException from gui_plugin.core.Context import get_context @plugin_function('gui.users.createUser', cli=True, web=True) def create_user(username, password, role=None, allowed_hosts=None, be_session=None): """Creates a new user account Args: username (str): The name of the user password (str): The user's password role (str): The role that should be granted to the user, optional allowed_hosts (str): Allowed hosts that user can connect from be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the user ID. """ with BackendDatabase(be_session) as db: with BackendTransaction(db): user_id = backend.create_user( db, username, password, role, allowed_hosts) return user_id @plugin_function('gui.users.setAllowedHosts', web=True) def set_allowed_hosts(user_id, allowed_hosts, be_session=None): """Sets the allowed hosts for the given user. Args: user_id (int): The id of the user. allowed_hosts (str): Allowed hosts that user can connect from be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: with BackendTransaction(db): db.execute('''UPDATE user SET allowed_hosts = ? WHERE id = ?''', (allowed_hosts, user_id,)) @plugin_function('gui.users.deleteUser', cli=True, web=True) def delete_user(username, be_session=None): """Deletes a user account Args: username (str): The name of the user be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: user_id = backend.get_user_id(db, username) default_group_id = backend.get_default_group_id(db, user_id) with BackendTransaction(db): db.execute("DELETE FROM user WHERE name = ?", (username,)) db.execute( "DELETE FROM user_has_role WHERE user_id = ?", (user_id,)) db.execute( "DELETE FROM user_group_has_user WHERE user_id = ?", (user_id,)) backend.remove_user_group(db, default_group_id) @plugin_function('gui.users.grantRole', web=True) def grant_role(username, role, be_session=None): """Grant the given roles to the user. Args: username (str): The name of the user role (str): The list of roles that should be assigned to the user. Use listRoles() to list all available roles. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: user_id = backend.get_user_id(db, username) if not user_id: raise MSGException(Error.USER_INVALID_ROLE, f"There is no user with the name '{username}'.") # ensure roles is always a list roles = [role] # check for each role specified if that role actually exists in the db with BackendTransaction(db): for role in roles: res = db.execute( "SELECT id FROM role WHERE name = ?", (role,)).fetch_one() if not res: raise MSGException(Error.USER_INVALID_ROLE, f"There is no role with the name '{role}'.") else: role_id = res[0] # perform the many-to-many insert db.execute("INSERT INTO user_has_role(" "user_id, role_id) " "VALUES(?, ?)", (user_id, role_id)) @plugin_function('gui.users.getUserId', web=True) def get_user_id(username, be_session=None): """Gets the id for a given user. Args: username (str): The user for which the id will be returned. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the user ID. """ with BackendDatabase(be_session) as db: user_id = backend.get_user_id(db, username) return user_id @plugin_function('gui.users.listUsers', cli=True, web=True) def list_users(be_session=None): """Lists all user accounts. Args: be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of users. """ with BackendDatabase(be_session) as db: return backend.list_users(db) @plugin_function('gui.users.listUserRoles', web=True) def list_user_roles(username, be_session=None): """List the granted roles for a given user. Args: username (str): The name of the user be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of roles. """ with BackendDatabase(be_session) as db: return db.select('''SELECT r.name, r.description FROM user u INNER JOIN user_has_role u_r ON u.id = u_r.user_id INNER JOIN role r ON u_r.role_id=r.id WHERE upper(u.name) = upper(?)''', (username,)) @plugin_function('gui.users.listRoles', web=True) def list_roles(be_session=None): """Lists all roles that can be assigned to users. Args: be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of roles. """ with BackendDatabase(be_session) as db: return db.select("SELECT name, description " "FROM role") @plugin_function('gui.users.listRolePrivileges', web=True) def list_role_privileges(role, be_session=None): """Lists all privileges of a role. Args: role (str): The name of the role. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of privileges. """ with BackendDatabase(be_session) as db: return db.select('''SELECT pt.name as type, p.name, p.access_pattern FROM privilege p INNER JOIN role_has_privilege r_p ON p.id = r_p.privilege_id INNER JOIN privilege_type pt ON p.privilege_type_id=pt.id INNER JOIN role r ON r_p.role_id = r.id WHERE r.name = ?''', (role,)) @plugin_function('gui.users.listUserPrivileges', web=True) def list_user_privileges(username, be_session=None): """Lists all privileges assigned to a user. Args: username (str): The name of the user. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of privileges. """ with BackendDatabase(be_session) as db: return db.select('''SELECT r.name, pt.name, p.name, p.access_pattern FROM privilege p INNER JOIN role_has_privilege r_p ON p.id = r_p.privilege_id INNER JOIN user_has_role u_r ON r_p.role_id = u_r.role_id INNER JOIN privilege_type pt ON p.privilege_type_id=pt.id INNER JOIN user u ON u_r.user_id = u.id INNER JOIN role r ON u_r.role_id = r.id WHERE upper(u.name) = upper(?)''', (username,)) @plugin_function('gui.users.getGuiModuleList', shell=False, web=True) def get_gui_module_list(user_id, be_session=None): """Returns the list of modules for the given user. Args: user_id (int): The id of the user. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of modules. """ with BackendDatabase(be_session) as db: modules = [] res = db.execute('''SELECT p.access_pattern FROM privilege p INNER JOIN role_has_privilege r_p ON p.id = r_p.privilege_id INNER JOIN role r ON r_p.role_id=r.id INNER JOIN user_has_role u_r ON r.id = u_r.role_id WHERE u_r.user_id = ? AND p.privilege_type_id=2''', (user_id,)).fetch_all() if res: patterns = [] for pattern in res: patterns.append(re.compile(pattern[0])) # Only look at the gui extension object for now mod = getattr(mysqlsh.globals, "gui") obj_names = dir(mod) for obj_name in obj_names: for p in patterns: m = p.match(obj_name) if m: obj = getattr(mod, obj_name) func_names = dir(obj) if "is_gui_module_backend" in func_names: f = getattr(obj, "is_gui_module_backend") if f(): modules.append(f"gui.{obj_name}") return modules @plugin_function('gui.users.listProfiles', cli=True, shell=True, web=True) def list_profiles(user_id, be_session=None): """Returns the list of profile for the given user Args: user_id (int): The id of the user. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of profiles. """ with BackendDatabase(be_session) as db: return db.select('''SELECT id, name FROM profile WHERE user_id = ?''', (user_id,)) @plugin_function('gui.users.getProfile', cli=True, shell=True, web=True) def get_profile(profile_id, be_session=None): """Returns the specified profile. Args: profile_id (int): The id of the profile. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: dict: the user profile. """ with BackendDatabase(be_session) as db: profile = backend.get_profile(db, profile_id) return profile @plugin_function('gui.users.updateProfile', cli=True, shell=True, web=True) def update_profile(profile, be_session=None): """Updates a user profile. Args: profile (dict): A dictionary with the profile information be_session (object): A session to the GUI backend database where the operation will be performed. Allowed options for profile: id (int): The id of profile name (str): The profile name description (str): A longer description for profile options (dict): The options specific for the profile Returns: None """ with BackendDatabase(be_session) as db: options = profile.get('options', {}) db.execute('''UPDATE profile SET name = ?, description = ?, options = ? WHERE id = ?''', (profile.get('name'), profile.get('description', ''), None if options is None else json.dumps(options), profile.get('id'))) @plugin_function('gui.users.addProfile', cli=True, shell=True, web=True) def add_profile(user_id, profile, be_session=None): """Returns the specified profile. Args: user_id (int): The id of the user. profile (dict): The profile to add be_session (object): A session to the GUI backend database where the operation will be performed. Allowed options for profile: name (str): The profile name description (str): A longer description for profile options (dict): The options specific for the profile Returns: int: the profile ID. """ with BackendDatabase(be_session) as db: profile_id = backend.add_profile(db, user_id, profile) return profile_id @plugin_function('gui.users.deleteProfile', cli=True, shell=True, web=True) def delete_profile(user_id, profile_id, be_session=None): """Deletes a profile for the current user. Args: user_id (int): The id of the user to which the profile belongs to. profile_id (int): The ID of the profile to delete. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: if not backend.delete_profile(db, user_id, profile_id): raise MSGException(Error.USER_DELETE_PROFILE, f"Could not delete any profile with the supplied criteria.") @plugin_function('gui.users.getDefaultProfile', cli=True, shell=True, web=True) def get_default_profile(user_id, be_session=None): """Returns the default profile for the given user. Args: user_id (int): The id of the user. be_session (object): A session to the GUI backend database where the operation will be performed. Returns: dict: the user profile. """ with BackendDatabase(be_session) as db: profile = backend.get_default_profile(db, user_id) return profile @plugin_function('gui.users.setDefaultProfile', cli=True, shell=True, web=True) def set_default_profile(user_id, profile_id, be_session=None): """Sets the default profile for the given user. Args: user_id (int): The id of the user. profile_id (int): The id of the profile to become the default profile be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: with BackendTransaction(db): backend.set_default_profile(db, user_id, profile_id) @plugin_function('gui.users.setCurrentProfile', cli=True, shell=True, web=True) def set_current_profile(profile_id): """Sets the profile of the user's current web session. Args: profile_id (int): The id of the profile to become the current profile Returns: None """ context = get_context() if context: context.web_handler.set_active_profile_id(profile_id) @plugin_function('gui.users.listUserGroups', cli=True, web=True) def list_user_groups(member_id=None, be_session=None): """Returns the list of all groups or list all groups that given user belongs. Args: member_id (int): User ID be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list od user groups. """ with BackendDatabase(be_session) as db: return backend.get_user_groups(db, member_id) @plugin_function('gui.users.createUserGroup', cli=True, web=True) def create_user_group(name, description, be_session=None): """Creates user group. Args: name (str): Group name description (str): Description of the group be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the group ID. """ with BackendDatabase(be_session) as db: group_id = backend.create_group(db, name, description) return group_id @plugin_function('gui.users.addUserToGroup', cli=True, web=True) def add_user_to_group(member_id, group_id, owner=0, be_session=None): """Adds user to user group. Args: member_id (int): User ID group_id (int): Group ID owner (int): If user is owner be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: backend.add_user_to_group(db, member_id, group_id, owner) @plugin_function('gui.users.removeUserFromGroup', cli=True, web=True) def remove_user_from_group(member_id, group_id, be_session=None): """Removes user from user group. Args: member_id (int): User ID group_id (int): Group ID be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: backend.remove_user_from_group(db, member_id, group_id) @plugin_function('gui.users.updateUserGroup', cli=True, web=True) def update_user_group(group_id, name=None, description=None, be_session=None): """Updates user group. Args: group_id (int): Group ID name (str): Group name description (str): Description of the group be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: backend.update_user_group(db, group_id, name, description) @plugin_function('gui.users.removeUserGroup', cli=True, web=True) def remove_user_group(group_id, be_session=None): """Removes given user group. Args: group_id (int): Group ID be_session (object): A session to the GUI backend database where the operation will be performed. Returns: None """ with BackendDatabase(be_session) as db: backend.remove_user_group(db, group_id)