gui/backend/gui_plugin/modules/Modules.py (311 lines of code) (raw):

# Copyright (c) 2020, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA from mysqlsh.plugin_manager import plugin_function # pylint: disable=no-name-in-module import json import datetime from gui_plugin.core.Db import BackendDatabase, BackendTransaction from . import backend from gui_plugin.core.Error import MSGException import gui_plugin.core.Error as Error from gui_plugin.core.Context import get_context @plugin_function('gui.modules.addData', shell=False, web=True) def add_data(caption, content, data_category_id, tree_identifier, folder_path=None, profile_id=None, be_session=None): """Creates a new Module Data record for the given module and associates it to the active user profile and personal user group. Args: caption (str): The data caption content (str): The content of data module data_category_id (int): The id of data category tree_identifier (str): The identifier of the tree folder_path (str): The folder path f.e. "/scripts/server1" profile_id (int): The id of profile be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the new record. """ if caption.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'caption' cannot be empty.") if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") if folder_path.strip() == "" or folder_path.strip() == "/": folder_path = None with BackendDatabase(be_session) as db: with BackendTransaction(db): db.execute('''INSERT INTO data (data_category_id, caption, content, created, last_update) VALUES(?, ?, ?, ?, ?)''', (data_category_id, caption, json.dumps(content), datetime.datetime.now(), datetime.datetime.now())) id = db.get_last_row_id() context = get_context() backend.add_data_associations(db, id, tree_identifier, folder_path, profile_id if profile_id else context.web_handler.session_active_profile_id, context.web_handler.user_personal_group_id) return id @plugin_function('gui.modules.listData', shell=False, web=True) def list_data(folder_id, data_category_id=None, be_session=None): """Get list of data Args: folder_id (int): The id of the folder data_category_id (int): The id of data category be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of the data. """ with BackendDatabase(be_session) as db: sql = """SELECT d.id, d.data_category_id, d.caption, d.created, d.last_update FROM data d JOIN data_folder_has_data dfhd ON dfhd.data_id=d.id WHERE dfhd.data_folder_id=?""" args = (folder_id,) if data_category_id: CATEGORIES_SQL = """WITH RECURSIVE categories(id) AS ( SELECT id FROM data_category WHERE id=? UNION ALL SELECT dc.id FROM categories c JOIN data_category dc ON c.id=dc.parent_category_id ) SELECT DISTINCT id FROM categories""" sql += f" AND d.data_category_id in ({CATEGORIES_SQL})" args += (data_category_id,) return db.select(sql, args) @plugin_function('gui.modules.getDataContent', shell=False, web=True) def get_data_content(id, be_session=None): """Gets content of the given module Args: id (int): The id of the data be_session (object): A session to the GUI backend database where the operation will be performed. Returns: dict: the content of the data. """ with BackendDatabase(be_session) as db: res = db.execute('''SELECT content FROM data WHERE id=?''', (id,)).fetch_one() # First we nee to make sure that data content exists for given module id, # then we can check if user have privileges for it. if res is None: raise MSGException(Error.MODULES_INVALID_MODULE_ID, f"There is no data for the given module id: {id}.") # If user have any privileges either assigned to user group or profile # he can see data content, otherwise Exception will be raised context = get_context() backend.get_user_privileges_for_data(db, id, context.web_handler.session_user_id) try: content = json.loads(res['content']) except Exception as e: raise MSGException(Error.CORE_INVALID_DATA_FORMAT, f"Error decoding data content: {str(e)}") return content @plugin_function('gui.modules.shareDataToUserGroup', shell=False, web=True) def share_data_to_user_group(id, user_group_id, read_only, tree_identifier, folder_path=None, be_session=None): """Shares data to user group Args: id (int): The id of the data user_group_id (int): The id of user group read_only (int): The flag that specifies whether the data is read only tree_identifier (str): The identifier of the tree folder_path (str): The folder path f.e. "/scripts/server1" be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the folder to which the data was shared. """ if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") if folder_path.strip() == "" or folder_path.strip() == "/": folder_path = None with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() privileges = backend.get_user_privileges_for_data(db, id, context.web_handler.session_user_id) max_privilege = min([p['read_only'] for p in privileges]) if max_privilege <= read_only: user_group_root_folder_id = backend.get_root_folder_id( db, tree_identifier, 'group', user_group_id) if user_group_root_folder_id is None: user_group_root_folder_id = backend.create_user_group_data_tree( db, tree_identifier, user_group_id) if folder_path: folder_user_group_id, _ = backend.create_folder( db, folder_path, user_group_root_folder_id) else: folder_user_group_id = user_group_root_folder_id backend.add_data_to_folder( db, id, folder_user_group_id, read_only) else: raise MSGException(Error.MODULES_SHARING_WITH_HIGHER_PERMISSIONS, "Cannot share data with higher permission than user has.") return folder_user_group_id @plugin_function('gui.modules.addDataToProfile', shell=False, web=True) def add_data_to_profile(id, profile_id, read_only, tree_identifier, folder_path=None, be_session=None): """Shares data to profile Args: id (int): The id of the data profile_id (int): The id of profile read_only (int): The flag that specifies whether the data is read only tree_identifier (str): The identifier of the tree folder_path (str): The folder path f.e. "/scripts/server1" be_session (object): A session to the GUI backend database where the operation will be performed. Returns: The id of the folder to which the data was shared. """ if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") if folder_path.strip() == "" or folder_path.strip() == "/": folder_path = None with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() privileges = backend.get_user_privileges_for_data(db, id, context.web_handler.session_user_id) max_privilege = min([p['read_only'] for p in privileges]) # We check if the user is owner of given profile if backend.get_profile_owner(db, profile_id) == context.web_handler.session_user_id: if max_privilege <= read_only: profile_root_folder_id = backend.get_root_folder_id( db, tree_identifier, 'profile', profile_id) if profile_root_folder_id is None: profile_root_folder_id = backend.create_profile_data_tree( db, tree_identifier, profile_id) if folder_path: folder_profile_id, _ = backend.create_folder( db, folder_path, profile_root_folder_id) else: folder_profile_id = profile_root_folder_id backend.add_data_to_folder( db, id, folder_profile_id, read_only) else: raise MSGException(Error.MODULES_SHARING_WITH_HIGHER_PERMISSIONS, "Cannot assign data to profile with higher permission than user has.") else: raise MSGException(Error.MODULES_USER_HAVE_NO_PRIVILEGES, "User have no privileges to perform operation.") return folder_profile_id @plugin_function('gui.modules.updateData', shell=False, web=True) def update_data(id, caption=None, content=None, be_session=None): """Update data of the given module Args: id (int): The id of the data caption (str): Caption content (str): The content data be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the updated record. """ with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() privileges = backend.get_user_privileges_for_data(db, id, context.web_handler.session_user_id) if any([p['read_only'] == 0 for p in privileges]): actions = [] args = tuple() if caption: actions.append("caption=?") args += (caption,) if content: actions.append("content=?") args += (json.dumps(content),) db.execute(f"""UPDATE data SET {",".join(actions)} WHERE id=?""", args + (id,)) else: raise MSGException(Error.MODULES_USER_HAVE_NO_PRIVILEGES, "User have no privileges to perform operation.") return id @plugin_function('gui.modules.deleteData', shell=False, web=True) def delete_data(id, folder_id, be_session=None): """Deletes data Args: id (int): The id of the data folder_id (int): The id of the folder be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the deleted record. """ with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() backend.get_user_privileges_for_data(db, id, context.web_handler.session_user_id) return backend.delete_data(db, id, folder_id) @plugin_function("gui.modules.listDataCategories", shell=False, web=True) def list_data_categories(category_id=None, be_session=None): """Gets the list of available data categories and sub categories for the given name. Args: category_id (int): The id of the data category be_session (object): A session to the GUI backend database where the operation will be performed. Returns: The list of available data categories """ with BackendDatabase(be_session) as db: if category_id is None: rows = db.select("""SELECT id, name, parent_category_id FROM data_category WHERE parent_category_id is NULL""", ()) else: rows = db.select("""WITH RECURSIVE categories(id, name, parent_category_id) AS ( SELECT id, name, parent_category_id FROM data_category WHERE id=? UNION ALL SELECT dc.id, dc.name, dc.parent_category_id FROM categories c JOIN data_category dc ON c.id=dc.parent_category_id ) SELECT DISTINCT id, name, parent_category_id FROM categories""", (category_id,)) if not rows: raise MSGException( Error.MODULES_INVALID_DATA_CATEGORY, "Data category does not exist.") status = db.get_last_status() if status['type'] != "OK": raise MSGException(Error.DB_ERROR, status['msg']) return rows @plugin_function("gui.modules.addDataCategory", shell=False, web=True) def add_data_category(name, parent_category_id=None, be_session=None): """Add a new data category to the list of available data categories for this module Args: name (str): The name of the data category parent_category_id (int): The id of the parent category be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of added category. """ if name.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'name' cannot be empty.") with BackendDatabase(be_session) as db: search = db.execute("""SELECT id from data_category WHERE name=?""", (name,)).fetch_one() if search: raise MSGException(Error.MODULES_INVALID_DATA_CATEGORY, "Data category already exists.") else: res = db.execute( """SELECT MAX(id) FROM data_category""").fetch_one() # First 100 ids are reserved for predefined data categories id = res[0] + 1 if res[0] > 100 else 101 db.execute("""INSERT INTO data_category (id, parent_category_id, name) VALUES (?, ?, ?)""", (id, parent_category_id, name, )) category_id = db.get_last_row_id() return category_id @plugin_function("gui.modules.removeDataCategory", shell=False, web=True) def remove_data_category(category_id, be_session=None): """Remove a data category from the list of available data categories for this module Args: category_id (int): The id of the data category be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the removed category. """ if category_id <= 100: raise MSGException(Error.MODULES_CANT_DELETE_MODULE_CATEGORY, "Can't delete predefined data category.") with BackendDatabase(be_session) as db: res = db.execute("""SELECT data_category_id FROM data WHERE data_category_id=? LIMIT 1""", (category_id,)).fetch_all() if res: raise MSGException(Error.MODULES_CANT_DELETE_MODULE_CATEGORY, "Can't delete data category associated with data.") res = db.execute("""SELECT id FROM data_category WHERE parent_category_id=? LIMIT 1""", (category_id,)).fetch_all() if res: raise MSGException(Error.MODULES_CANT_DELETE_MODULE_CATEGORY, "Can't delete data category associated with sub categories.") db.execute("""DELETE FROM data_category WHERE id=?""", (category_id,)).fetch_one() if db.rows_affected == 0: raise MSGException(Error.MODULES_INVALID_DATA_CATEGORY, "Data category does not exist.") return category_id @plugin_function("gui.modules.getDataCategoryId", shell=False, web=True) def get_data_category_id(name, be_session=None): """Gets id for given name and module id. Args: name (str): The name of the data category be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the data category. """ with BackendDatabase(be_session) as db: res = db.execute("""SELECT id FROM data_category WHERE name=?""", (name,)).fetch_one() if not res: raise MSGException(Error.MODULES_INVALID_DATA_CATEGORY, "Data category does not exist.") return res['id'] @plugin_function("gui.modules.createProfileDataTree", shell=False, web=True) def create_profile_data_tree(tree_identifier, profile_id=None, be_session=None): """Creates the profile data tree for the given tree identifier and profile id. Args: tree_identifier (str): The identifier of the tree profile_id (int): The id of profile be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the root folder. """ if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() root_folder_id = backend.create_profile_data_tree(db, tree_identifier, profile_id if profile_id else context.web_handler.session_active_profile_id) return root_folder_id @plugin_function("gui.modules.getProfileDataTree", shell=False, web=True) def get_profile_data_tree(tree_identifier, profile_id=None, be_session=None): """Gets the profile data tree for the given tree identifier and profile id. Args: tree_identifier (str): The identifier of the tree profile_id (int): The id of profile be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of all folders in data tree. """ with BackendDatabase(be_session) as db: context = get_context() root_folder_id = backend.get_root_folder_id(db, tree_identifier, 'profile', profile_id if profile_id else context.web_handler.session_active_profile_id) return db.select(backend.FOLDERS_TREE_SQL, (root_folder_id,)) @plugin_function("gui.modules.createUserGroupDataTree", shell=False, web=True) def create_user_group_data_tree(tree_identifier, user_group_id=None, be_session=None): """Creates the user group data tree for the given tree identifier and user group id. Args: tree_identifier (str): The identifier of the tree user_group_id (int): The id of user group be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the root folder. """ if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") with BackendDatabase(be_session) as db: with BackendTransaction(db): context = get_context() root_folder_id = backend.create_user_group_data_tree(db, tree_identifier, user_group_id if user_group_id else context.web_handler.user_personal_group_id) return root_folder_id @plugin_function("gui.modules.getUserGroupDataTree", shell=False, web=True) def get_user_group_data_tree(tree_identifier, user_group_id=None, be_session=None): """Gets the user group data tree for the given tree identifier and user group id. Args: tree_identifier (str): The identifier of the tree user_group_id (int): The id of user group be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of all folders in data tree. """ with BackendDatabase(be_session) as db: context = get_context() root_folder_id = backend.get_root_folder_id(db, tree_identifier, 'group', user_group_id if user_group_id else context.web_handler.user_personal_group_id) return db.select(backend.FOLDERS_TREE_SQL, (root_folder_id,)) @plugin_function("gui.modules.getProfileTreeIdentifiers", shell=False, web=True) def get_profile_tree_identifiers(profile_id=None, be_session=None): """Gets the tree identifiers associated with the given profile. Args: profile_id (int): The id of profile be_session (object): A session to the GUI backend database where the operation will be performed. Returns: list: the list of tree identifiers. """ with BackendDatabase(be_session) as db: context = get_context() return db.select("""SELECT tree_identifier FROM data_profile_tree WHERE profile_id=?""", (profile_id if profile_id else context.web_handler.session_active_profile_id,)) @plugin_function("gui.modules.moveData", shell=False, web=True) def move_data(id, tree_identifier, linked_to, link_id, source_path, target_path, be_session=None): """Moves data from source path to target path. Args: id (int): The id of the data tree_identifier (str): The identifier of the tree linked_to (str): ['profile'|'group'] link_id (int): The profile id or the group id (depending on linked_to) source_path (str): The source folder path f.e. "/scripts/server1" target_path (str): The target folder path f.e. "/scripts/server2" be_session (object): A session to the GUI backend database where the operation will be performed. Returns: int: the id of the moved record. """ if linked_to not in ['profile', 'group']: raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'linked_to' can only take value 'profile' or 'group'.") if tree_identifier.strip() == "": raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameter 'tree_identifier' cannot be empty.") if source_path.strip() == "" or source_path.strip() == "/": source_path = None if target_path.strip() == "" or target_path.strip() == "/": target_path = None if source_path == target_path: raise MSGException(Error.CORE_INVALID_PARAMETER, f"Parameters 'source_path' and 'target_path' are the same.") with BackendDatabase(be_session) as db: with BackendTransaction(db): root_folder_id = backend.get_root_folder_id( db, tree_identifier, linked_to, link_id) if root_folder_id is None: raise MSGException(Error.CORE_INVALID_PARAMETER, f"Cannot find root folder id for the given 'tree_identifier'.") source_folder_id = backend.get_folder_id( db, root_folder_id, source_path) if source_folder_id is None: raise MSGException(Error.CORE_INVALID_PARAMETER, f"Cannot find the given 'source_path'.") target_folder_id = backend.get_folder_id( db, root_folder_id, target_path) if target_folder_id is None: target_folder_id, _ = backend.create_folder( db, target_path, root_folder_id) backend.add_data_to_folder(db, id, target_folder_id, read_only=0) return backend.delete_data(db, id, source_folder_id)