mds_plugin/user.py (1,083 lines of code) (raw):

# Copyright (c) 2021, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Sub-Module to manage OCI Users, Groups and Policies""" from mysqlsh.plugin_manager import plugin_function from mds_plugin import core, configuration def format_user_listing(data): import re out = "" i = 1 for u in data: # Shorten to 40 chars max, remove linebreaks description = re.sub(r'[\n\r]', ' ', u.description[:39] + '..' if len(u.description) > 41 else u.description) # Shorten to 24 chars max, remove linebreaks name = re.sub(r'[\n\r]', ' ', u.name[:22] + '..' if len(u.name) > 24 else u.name) out += f"{i:>4} {name:24} {description:41} {u.lifecycle_state}\n" i += 1 return out def format_group_listing(data): import re out = "" i = 1 for g in data: # Shorten to 40 chars max, remove linebreaks description = re.sub(r'[\n\r]', ' ', g.description[:39] + '..' if len(g.description) > 41 else g.description) # Shorten to 24 chars max, remove linebreaks name = re.sub(r'[\n\r]', ' ', g.name[:22] + '..' if len(g.name) > 24 else g.name) out += f"{i:>4} {name:24} {description:41} {g.lifecycle_state}\n" i += 1 return out def format_dynamic_group_listing(data): import re out = "" i = 1 for g in data: # Shorten to 40 chars max, remove linebreaks description = re.sub(r'[\n\r]', ' ', g.description[:39] + '..' if len(g.description) > 41 else g.description) # Shorten to 24 chars max, remove linebreaks name = re.sub(r'[\n\r]', ' ', g.name[:22] + '..' if len(g.name) > 24 else g.name) out += f"{i:>4} {name:24} {description:41} " out += f"{g.time_created:%Y-%m-%d %H:%M} {g.lifecycle_state}\n" out += f" {g.matching_rule}\n" i += 1 return out def format_policy_listing(data): """Formats a list of policies into human readable format Args: data (list): A list of policies Returns: The formatted string """ import re import textwrap out = "" i = 1 for p in data: # Shorten to max chars, remove linebreaks name = re.sub(r'[\n\r]', ' ', p.name[:22] + '..' if len(p.name) > 24 else p.name) # Shorten to 54 chars max, remove linebreaks desc = re.sub(r'[\n\r]', ' ', p.description[:52] + '..' if len(p.description) > 54 else p.description) time = f"{p.time_created:%Y-%m-%d %H:%M}" \ if p.time_created is not None else "" statements = "" for s in p.statements: statements += textwrap.fill( re.sub(r'[\n\r]', ' ', s), width=93, initial_indent=' ' * 5 + '- ', subsequent_indent=' ' * 7) + "\n" out += (f"{i:>4} {name:24} {desc:54} {time:16} {p.lifecycle_state}\n" f"{statements}") i += 1 return out def get_fingerprint(key): from hashlib import md5 from codecs import decode # There should be more error checking here, but to keep the example simple # the error checking has been omitted. m = md5() # Strip out the parts of the key that are not used in the fingerprint # computation. key = key.replace(b'-----BEGIN PUBLIC KEY-----\n', b'') key = key.replace(b'\n-----END PUBLIC KEY-----', b'') # The key is base64 encoded and needs to be decoded before getting the md5 # hash decoded_key = decode(key, "base64") m.update(decoded_key) hash = m.hexdigest() # Break the hash into 2 character parts. length = 2 parts = list(hash[0 + i:length + i] for i in range(0, len(hash), length)) # Join the parts with a colon seperator fingerprint = ":".join(parts) return fingerprint def is_key_already_uploaded(keys, fingerprint): for key in keys: if key.fingerprint == fingerprint: return True return False @plugin_function('mds.create.user') def create_user(**kwargs): """Creates a new user This function will create a new user. Args: **kwargs: Optional parameters Keyword Args: name (str): The name of the user. description (str): A description for the user. email (str): The email of the user config (dict): An OCI config dict return_object (bool): If the object should be returned Returns: The user object or None """ name = kwargs.get("name") description = kwargs.get("description") email = kwargs.get("email") config = kwargs.get("config") return_object = kwargs.get("return_object", False) # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import mysqlsh if name is None: name = mysqlsh.globals.shell.prompt( "Please enter a name for the new user account: ", {'defaultValue': ''}).strip() if name == '': print("User creation cancelled.") return if description is None: description = mysqlsh.globals.shell.prompt( "Please enter a description for the new user account [-]: ", {'defaultValue': '-'}).strip() # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Setup the user details user_details = oci.identity.models.CreateUserDetails( compartment_id=config.get("tenancy"), name=name, description=description if description is not None else "-", email=email ) # Create the compartment try: user = identity.create_user(user_details).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: print(f'ERROR: {e}') # Return the response if return_object: return user else: print(f"User {name} created.") @plugin_function('mds.list.users') def list_users(config=None, interactive=True, return_formatted=True): """Lists users Lists all users of a given compartment. Args: config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: A list of users """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the users try: # Users are always on the tenancy level data = identity.list_users(compartment_id=config.get("tenancy")).data except oci.exceptions.ServiceError as e: if not interactive: raise if e.code == "NotAuthorizedOrNotFound": print(f'You do not have privileges to list users.') print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if not interactive: raise print(f'ERROR: {e}') return if return_formatted: return format_user_listing(data) else: return oci.util.to_dict(data) @plugin_function('mds.get.user') def get_user(user_name=None, user_id=None, config=None): """Get user object Gets user details of a given user account. Args: user_name (str): The name of the user user_id (str): The OCID of the user. config (object): An OCI config object or None. Returns: The user object or None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) # If an user_id is given, look up the user directly if user_id is not None: data = identity.get_user(user_id=user_id).data return data try: # List the users data = identity.list_users(compartment_id=config.get("tenancy")).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return # Filter out all deleted compartments data = [u for u in data if u.lifecycle_state != "DELETED"] # If an user_name was given not given, print the user list if user_name is None: user_list = format_user_listing(data) print(f"Users:\n{user_list}") # Let the user choose from the list user = core.prompt_for_list_item( item_list=data, prompt_caption=("Please enter the name or index " "of the user: "), item_name_property="name", given_value=user_name) return user @plugin_function('mds.get.userId') def get_user_id(user_name=None, config=None): """Get user id Args: user_name (str): The name of the user. config (object): An OCI config object or None. Returns: The OCID of the user """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return # Get user user = get_user(user_name=user_name, config=config) return None if user is None else user.id @plugin_function('mds.delete.user') def delete_user(user_name=None, user_id=None, config=None, interactive=True): """Deletes a user Args: user_name (str): The name of the user. user_id (str): The OCID of the user. config (object): An OCI config object or None. interactive (bool): If set to false, function returns true on success Returns: None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import re import mysqlsh # Get user user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("The user was not found.") return if interactive: # Prompt the user for confirmation prompt = mysqlsh.globals.shell.prompt( f"Are you sure you want to delete the user {user.name} " f"[yes/NO]: ", {'defaultValue': 'no'}).strip().lower() if prompt != "yes": print("Deletion aborted.\n") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Delete API keys if there are any # api_keys = identity.list_api_keys(user.id).data try: # Remove the user from all groups data = identity.list_user_group_memberships( compartment_id=config.get("tenancy"), user_id=user.id).data for m in data: identity.remove_user_from_group(m.id).data # Delete the user identity.delete_user(user.id) except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return if interactive: print("User deleted") else: return True @plugin_function('mds.get.group') def get_group(name=None, group_id=None, config=None): """Get group object Args: name (str): The name of the group group_id (str): The OCID of the group config (object): An OCI config object or None Returns: The group object or None """ # Get the current config try: config = configuration.get_current_config(config=config) import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) # If an user_id is given, look up the user directly if group_id is not None: data = identity.get_group(group_id=group_id).data return data # List the groups data = identity.list_groups(compartment_id=config.get("tenancy")).data # Filter out all deleted compartments data = [u for u in data if u.lifecycle_state != "DELETED"] # If an name was given not given, print the user list if name is None: group_list = format_group_listing(data) print(f"Groups:\n{group_list}") # Let the user choose from the list group = core.prompt_for_list_item( item_list=data, prompt_caption=("Please enter the name or index " "of the group: "), item_name_property="name", given_value=name) return group except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return @plugin_function('mds.get.groupId') def get_group_id(name=None, config=None): """Get group id Args: name (str): The name of the group. config (object): An OCI config object or None. Returns: The OCID of the user """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return # Get user group = get_group(name=name, config=config) return None if group is None else group.id @plugin_function('mds.list.groupUsers') def list_group_users(name=None, group_id=None, config=None, interactive=True, return_formatted=True): """lists the members of a given group Args: name (str): The name of the group group_id (str): The OCID of the group config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: The user account details """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import re group = get_group(name=name, group_id=group_id, config=config) if group is None: print("Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) try: # List the users of the given group data = identity.list_user_group_memberships( compartment_id=config.get("tenancy"), group_id=group.id).data users = [] for m in data: if return_formatted: users.append(identity.get_user(user_id=m.user_id).data) else: users.append(oci.util.to_dict( identity.get_user(user_id=m.user_id).data)) if return_formatted: if len(users) > 0: print(f"Users in group {group.name}:") print(format_user_listing(users)) else: print("No users in this group.") else: return users except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return @plugin_function('mds.list.groups') def list_groups(config=None, interactive=True, return_formatted=True): """Lists groups Lists all groups of the tenancy Args: config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: A list of groups """ # Get the current config try: config = configuration.get_current_config(config=config) import oci.identity # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the groups data = identity.list_groups(compartment_id=config.get("tenancy")).data if return_formatted: return format_group_listing(data) else: return oci.util.to_dict(data) except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return @plugin_function('mds.delete.group') def delete_group(name=None, group_id=None, config=None, interactive=True): """Deletes a user Args: name (str): The name of the group. group_id (str): The OCID of the group. config (object): An OCI config object or None. interactive (bool): If set to false, function returns true on success Returns: None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import re import mysqlsh # Get user group = get_group(name=name, group_id=group_id, config=config) if group is None: print("The group was not found.") return if interactive: # Prompt the user for confirmation prompt = mysqlsh.globals.shell.prompt( f"Are you sure you want to delete the Group {group.name} " f"[yes/NO]: ", {'defaultValue': 'no'}).strip().lower() if prompt != "yes": print("Deletion aborted.\n") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) try: identity.delete_group(group.id) except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return if interactive: print("Group deleted") else: return True @plugin_function('mds.create.userUiPassword') def create_user_ui_password(user_name=None, user_id=None, config=None, raise_exceptions=True): """Creates or resets the UI password of the user Args: user_name (str): The name of the user user_id (str): The OCID of the user config (object): An OCI config object or None. raise_exceptions (bool): If set to false exceptions are raised Returns: The API Keys of the user """ # Get the current config try: config = configuration.get_current_config(config=config) import oci.identity # Get the user object user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("User not found. Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) ui_password = identity.create_or_reset_ui_password( user_id=user.id).data tenancy = identity.get_tenancy(config.get("tenancy")).data print("\nThe OCI UI Console one-time password for user " f"'{user.name}' has been created.\n" "The user can now log in at the following URL:\n" f" https://console.{config.get('region')}.oraclecloud.com/?" f"tenant={tenancy.name}\n\n" "Oracle Cloud Infrastructure Login Information:\n\n" f"TENANT\n{tenancy.name}\n\n" f"USER NAME\n{user.name}\n\n" "PASSWORD") return ui_password.password except oci.exceptions.ServiceError as e: if raise_exceptions: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if raise_exceptions: raise print(f'ERROR: {e}') @plugin_function('mds.list.userApiKeys') def list_user_api_keys(user_name=None, user_id=None, config=None, interactive=True, return_formatted=True): """Lists the API Keys of the user Args: user_name (str): The name of the user user_id (str): The OCID of the user config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: The API Keys of the user """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the users try: api_keys = identity.list_api_keys(user.id).data except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return if return_formatted: out = "" i = 1 for k in api_keys: out += (f"{i:>4} {k.fingerprint:49} " f"{k.time_created:%Y-%m-%d %H:%M} {k.lifecycle_state}\n") i += 1 return out else: return oci.util.to_dict(api_keys) @plugin_function('mds.create.userApiKey') def create_user_api_key(user_name=None, key_path="~/.oci/", user_id=None, key_file_prefix="oci_api_key", config=None, write_to_disk=True, return_object=False, interactive=True): """Creates an API key for a user Args: user_name (str): The name of the user key_path (str): The path where the key files should be created user_id (str): The OCID of the user. key_file_prefix (str): A key_file_prefix to use config (dict): An OCI config object or None. write_to_disk (bool): Whether the keys should be written to disk return_object (bool): Whether the keys should be returned interactive (bool): Whether to query the user for input Returns: The user account details """ # Get the current config try: if interactive or user_name is not None or user_id is not None: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import time import os.path from pathlib import Path # Get the user object user = None if interactive or user_name is not None or user_id is not None: user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("User not found.") return # Generate the API Keys from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) private_key = key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()) public_key = key.public_key().public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo ) # Create path # Convert Unix path to Windows key_path = os.path.abspath(os.path.expanduser(key_path)) Path(key_path).mkdir(parents=True, exist_ok=True) # Create filenames for keys user_key_file_caption = f'_{user.name}' if user else '' private_key_path = os.path.join( key_path, f"{key_file_prefix}{user_key_file_caption}.pem") public_key_path = os.path.join( key_path, f"{key_file_prefix}{user_key_file_caption}_public.pem") # Write out keys if write_to_disk is True: with open(private_key_path, mode='wb') as file: file.write(private_key) with open(public_key_path, mode='wb') as file: file.write(public_key) # Get the fingerprint for the key fingerprint = get_fingerprint(public_key) # If a use was selected, upload the key if user: # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Check to see if this key is already associated with the user. if is_key_already_uploaded(identity.list_api_keys( user.id).data, fingerprint): print(f"Key with fingerprint {fingerprint} has already been " f"added to the user") return # Initialize the CreateApiKeyDetails model key_details = oci.identity.models.CreateApiKeyDetails( key=public_key.decode()) # Upload the key identity.upload_api_key(user.id, key_details) i = 0 key_uploaded = False while not key_uploaded and i < 30: key_uploaded = is_key_already_uploaded(identity.list_api_keys( user.id).data, fingerprint) if key_uploaded: break time.sleep(2) i += 1 if not key_uploaded: print("The key uploaded failed.") return if return_object: return { "private_key": private_key, "public_key": public_key, "fingerprint": fingerprint} @plugin_function('mds.delete.userApiKey') def delete_user_api_key(user_name=None, user_id=None, key_path="~/.oci/", key_file_prefix="oci_api_key", key_id=None, fingerprint=None, delete_from_disk=False, config=None, interactive=True): """Deletes an API key for a user Args: user_name (str): The name of the user user_id (str): The OCID of the user. key_path (str): The path where the key files should be created key_file_prefix (str): A key_file_prefix to use key_id (str): The key identifier to delete fingerprint (str): The fingerprint of the key to delete delete_from_disk (bool): Wether to delete the key files from disk config (dict): An OCI config object or None. interactive (bool): Whether to query the user for input Returns: True if the key was delete, else False """ # Get the current config try: if interactive or user_name is not None or user_id is not None: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return False import os.path import os from pathlib import Path # Get the user object user = None if interactive or user_name is not None or user_id is not None: user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("User not found.") return False fingerprint_to_delete=fingerprint if fingerprint_to_delete is None: keys = list_user_api_keys(user_name=user.name, interactive=interactive, return_formatted=False) if interactive: keys_formatted = list_user_api_keys(user_name=user.name, interactive=interactive, return_formatted=True) print(keys_formatted) selected_key = core.prompt_for_list_item( item_list=keys, prompt_caption=("Please enter the name or index " "of the key to delete: "), item_name_property="key") print(selected_key) fingerprint_to_delete = selected_key['fingerprint'] elif key_id is not None: for key in keys: if key['id'] == key_id: fingerprint_to_delete = key['fingerprint'] if fingerprint_to_delete is None: print('Invalid key fingerprint') return False # Create path # Convert Unix path to Windows key_path = os.path.abspath(os.path.expanduser(key_path)) Path(key_path).mkdir(parents=True, exist_ok=True) # Create filenames for keys # user_key_file_caption = f'_{user.name}' if user else '' # private_key_path = os.path.join( # key_path, f"{key_file_prefix}{user_key_file_caption}.pem") # public_key_path = os.path.join( # key_path, f"{key_file_prefix}{user_key_file_caption}_public.pem") # from cryptography.hazmat.primitives import serialization # from cryptography.hazmat.backends import default_backend # with open(public_key_path, mode='r') as file: # key = file.read().encode() # public_key = serialization.load_pem_public_key(data=key) # , backend=default_backend() # Get the fingerprint for the key # fingerprint = get_fingerprint(public_key.public_bytes( # serialization.Encoding.PEM, # serialization.PublicFormat.SubjectPublicKeyInfo # )) if delete_from_disk: user_key_file_caption = f'_{user.name}' if user else '' private_key_path = os.path.join( key_path, f"{key_file_prefix}{user_key_file_caption}.pem") public_key_path = os.path.join( key_path, f"{key_file_prefix}{user_key_file_caption}_public.pem") os.remove(private_key_path) os.remove(public_key_path) # Initialize the identity client identity = core.get_oci_identity_client(config=config) print(identity.list_api_keys(user.id).data) key_uploaded = is_key_already_uploaded(identity.list_api_keys( user.id).data, fingerprint_to_delete) print(f"key uploaded: {key_uploaded}") if not key_uploaded: print("key not uploaded") return False identity.delete_api_key(user_id=user.id, fingerprint=fingerprint_to_delete) return True @plugin_function('mds.create.group') def create_group(group_name=None, description=None, config=None, return_object=False): """Creates a new group Args: group_name (str): The name of the group. description (str): A description used for the new compartment. config (dict): An OCI config dict return_object (bool): If the object should be returned Returns: The group object or None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) if group_name is None: group_name = mysqlsh.globals.shell.prompt( "Please enter a name for the new group: ", {'defaultValue': ''}).strip() if group_name == '': print("Operation cancelled.") return if description is None: description = mysqlsh.globals.shell.prompt( "Please enter a description for the new group [-]: ", {'defaultValue': '-'}).strip() # Setup the user details group_details = oci.identity.models.CreateGroupDetails( compartment_id=config.get("tenancy"), name=group_name, description=description if description is not None else "-" ) # Create the group try: group = identity.create_group(group_details).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: print(f'ERROR: {e}') # Return the response if return_object: return group else: print(f"Group {group_name} created.") @plugin_function('mds.create.userGroupMembership') def add_user_to_group(user_name=None, group_name=None, user_id=None, group_id=None, config=None, return_object=False): """Adds a user to a group Args: user_name (str): The name of the user. group_name (str): The name of the group. user_id (str): The OCID of the user group_id (str): The OCID of the group config (object): An OCI config object or None. return_object (bool): If the object should be returned Returns: The membership object or None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity # Get user user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("Operation cancelled.") return user_name = user.name # Get user group = get_group(name=group_name, group_id=group_id, config=config) if group is None: print("Operation cancelled.") return # Create Details details = oci.identity.models.AddUserToGroupDetails( group_id=group.id, user_id=user.id ) group_name = group.name # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Add the user to the group try: membership = identity.add_user_to_group(details).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: print(f'ERROR: {e}') # Return the response if return_object: return membership else: print(f"User {user_name} was added to {group_name}.") @plugin_function('mds.delete.userGroupMembership') def delete_user_group_membership(user_name=None, group_name=None, user_id=None, group_id=None, config=None, interactive=True): """Removes a user from a group Args: user_name (str): The name of the user. group_name (str): The name of the group. user_id (str): The OCID of the user group_id (str): The OCID of the group config (object): An OCI config object or None. interactive (bool): If the True should be returned on success Returns: The membership object or None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity # Get user user = get_user(user_name=user_name, user_id=user_id, config=config) if user is None: print("Operation cancelled.") return # Get user group = get_group(name=group_name, group_id=group_id, config=config) if group is None: print("Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) try: # List the groups of the given user data = identity.list_user_group_memberships( compartment_id=config.get("tenancy"), user_id=user.id).data group_found = False for m in data: # If the right group was found, remove the membership if m.group_id == group.id: identity.remove_user_from_group(m.id).data group_found = True break if not group_found: if interactive: print(f"User {user.name} is not a member of {group.name}.") return except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: print(f'ERROR: {e}') # Return the response if interactive: print(f"User {user.name} was removed from {group.name}.") else: return True @plugin_function('mds.create.policy') def create_policy(policy_name=None, description=None, statements=None, compartment_id=None, config=None, return_object=False): """Creates a new policy Args: policy_name (str): The name of the policy. statements (str): A list of statements separated by description (str): A description used for the new compartment. compartment_id (str): The OCID of the compartment config (dict): An OCI config dict return_object (bool): If the object should be returned Returns: The group object or None """ # Get the active config and compartment try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) except ValueError as e: print(f"ERROR: {str(e)}") return import oci.identity import mysqlsh if policy_name is None: policy_name = mysqlsh.globals.shell.prompt( "Please enter a name for the new policy: ", {'defaultValue': ''}).strip() if policy_name == '': print("Operation cancelled.") return if description is None: description = mysqlsh.globals.shell.prompt( "Please enter a description for the new policy [-]: ", {'defaultValue': '-'}).strip() if statements is None: statements = "" print("Please enter the policy statements.\nPress [Enter] after each " "statement and leave a statement empty to finish the input.\n") stmt = "-" while stmt: stmt = mysqlsh.globals.shell.prompt( "Statement: ", {'defaultValue': ''}).strip() if stmt: statements += stmt + '\n' if statements == '': print("Operation cancelled.") return statement_list = statements.split("\n") # Remove the empty statement at the end del statement_list[-1] # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Setup the policy details policy_details = oci.identity.models.CreatePolicyDetails( compartment_id=compartment_id, name=policy_name, description=description if description is not None else "-", statements=statement_list ) # Create the policy try: group = identity.create_policy(policy_details).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: print(f'ERROR: {e}') # Return the response if return_object: return group else: print(f"Policy {policy_name} created.") @plugin_function('mds.list.policies') def list_policies(compartment_id=None, config=None, interactive=True, return_formatted=True): """Lists policies Lists all policies of the given compartment Args: compartment_id (str): OCID of the compartment config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: A list of groups """ # Get the active config and compartment try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) except ValueError as e: print(f"ERROR: {str(e)}") return import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) try: # List the policies data = identity.list_policies(compartment_id=compartment_id).data except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return if return_formatted: return format_policy_listing(data) else: return oci.util.to_dict(data) @plugin_function('mds.get.policy') def get_policy(policy_name=None, policy_id=None, compartment_id=None, config=None): """Get policy object Args: policy_name (str): The name of the policy policy_id (str): The OCID of the policy. compartment_id (str): The OCID of the compartment. config (object): An OCI config object or None. Returns: The policy object or None """ # Get the current config try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) except ValueError as e: print(e) return import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) # If an policy_id is given, look up the user directly if policy_id is not None: data = identity.get_policy(policy_id=policy_id).data return data try: # List the users data = identity.list_policies(compartment_id=compartment_id).data except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return # Filter out all deleted compartments data = [u for u in data if u.lifecycle_state != "DELETED"] # If an name was given not given, print the user list if policy_name is None: item_list = format_policy_listing(data) print(f"Policies:\n{item_list}") # Let the user choose from the list policy = core.prompt_for_list_item( item_list=data, prompt_caption=("Please enter the name or index " "of the policy: "), item_name_property="name", given_value=policy_name) return policy @plugin_function('mds.get.policyId') def get_policy_id(policy_name=None, compartment_id=None, config=None): """Get the policy id Args: policy_name (str): The name of the user. compartment_id (str): The OCID of the compartment config (object): An OCI config object or None. Returns: The OCID of the policy """ # Get the current config try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) except ValueError as e: print(e) return # Get user policy = get_policy( policy_name=policy_name, compartment_id=compartment_id, config=config) return None if policy is None else policy.id @plugin_function('mds.delete.policy') def delete_policy(policy_name=None, policy_id=None, compartment_id=None, config=None, interactive=True): """Deletes a policy Args: policy_name (str): The name of the policy. policy_id (str): The OCID of the policy. compartment_id (str): The OCID of the compartment config (object): An OCI config object or None. interactive (bool): If set to false, function returns true on success Returns: None """ # Get the current config try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) import oci.identity import re import mysqlsh # Get policy if policy_name != "*": policy = get_policy(policy_name=policy_name, policy_id=policy_id, config=config) if policy is None: print("The policy was not found.") return if interactive: if policy_name != "*": what_to_delete = f"the policy {policy.name}" else: what_to_delete = "all policies" prompt = mysqlsh.globals.shell.prompt( f"Are you sure you want to delete {what_to_delete} " f"[yes/NO]: ", {'defaultValue': 'no'}).strip().lower() if prompt != "yes": print("Deletion aborted.\n") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) if policy_name == "*": # List the policies data = identity.list_policies(compartment_id=compartment_id).data for p in data: identity.delete_policy(p.id) else: identity.delete_policy(policy.id) if interactive: # cSpell:ignore Polic print(f"Polic{'y' if policy_name != '*' else 'ies'} deleted.") else: return True except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return @plugin_function('mds.create.dynamicGroup') def create_dynamic_group(name=None, **kwargs): """Creates a new dynamic group Args: name (str): The name of the dynamic group. **kwargs: Additional options Keyword Args: description (str): A description used for the new dynamic group. defined_tags (dict): The defined_tags of the dynamic group. freeform_tags (dict): The freeform_tags of the dynamic group config (dict): An OCI config dict. compartment_id (str): An OCID of the compartment to use. interactive (bool): Whether user interaction should be performed. raise_exceptions (bool): If set to true exceptions are raised. return_object (bool): If the object should be returned. Returns: The group object or None """ description = kwargs.get("description") matching_rule = kwargs.get("matching_rule") defined_tags = kwargs.get("defined_tags") # Manual conversion from Shell Dict type until this is automatically done if defined_tags: defined_tags = dict(defined_tags) freeform_tags = kwargs.get("freeform_tags") # Manual conversion from Shell Dict type until this is automatically done if freeform_tags: freeform_tags = dict(freeform_tags) config = kwargs.get("config") compartment_id = kwargs.get("compartment_id") interactive = kwargs.get("interactive", True) raise_exceptions = kwargs.get("raise_exceptions", False) return_object = kwargs.get("return_object", False) # Get the current config try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) import oci.identity import mysqlsh from mds_plugin import compartment, compute # Initialize the identity client identity = core.get_oci_identity_client(config=config) if name is None and interactive: name = mysqlsh.globals.shell.prompt( "Please enter a name for the new dynamic group: ", {'defaultValue': ''}).strip() if name == '': print("No name given. Operation cancelled.") return if description is None and interactive: description = mysqlsh.globals.shell.prompt( "Please enter a description for the new dynamic group [-]: ", {'defaultValue': '-'}).strip() # If no matching_rule was given, start a simple rule builder if matching_rule is None and interactive: current_path = compartment.get_compartment_full_path( compartment_id=compartment_id, config=config, interactive=interactive) print("Please specify which compute instances should be included " "in this dynamic group.\n") match_list = [ f"All instances of the current compartment '{current_path}'", "All instances of the current compartment with a special tag", "All instances of the tenancy with a special tag", "One specific instance" ] matching_rule = core.prompt_for_list_item( item_list=match_list, prompt_caption="Please enter an index or a custom matching rule: ", prompt_default_value='', print_list=True) if not matching_rule: print("Operation cancelled.") return if matching_rule == match_list[1] or matching_rule == match_list[2]: print("Please use the following format to specify the tag to " "match against.\n" " <tagnamespace>.<tagkey> " "Example:department.operations\nor\n" " <tagnamespace>.<tagkey>=<value> " "Example:department.operations=45\n") prompt = mysqlsh.globals.shell.prompt( "Please enter the tag to match against: ", {'defaultValue': ''}).strip().lower() # Get tag into proper format if "=" in prompt: tag = "tag." + prompt.replace("=", ".value='") + "'" else: tag = f"tag.{prompt}.value" if matching_rule == match_list[1]: matching_rule = ("All {" f"instance.compartment.id = '{compartment_id}', {tag}" "}") else: matching_rule = tag elif matching_rule == match_list[0]: # Match the current compartment matching_rule = f"instance.compartment.id = '{compartment_id}'" elif matching_rule == match_list[3]: # Match a specific instance instance_id = compute.get_instance_id( compartment_id=compartment_id, config=config, interactive=interactive) if not instance_id: print("Operation cancelled.") return matching_rule = f"instance.id = '{instance_id}'" if matching_rule is None: print("No matching_rule given. Operation cancelled.") return # Setup the details details = oci.identity.models.CreateDynamicGroupDetails( compartment_id=config.get("tenancy"), name=name, description=description if description else "-", matching_rule=matching_rule, defined_tags=defined_tags, freeform_tags=freeform_tags ) # Create the group group = identity.create_dynamic_group(details).data # Return the response if return_object: return group else: print(f"Dynamic group {name} created.") except oci.exceptions.ServiceError as e: if raise_exceptions: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if raise_exceptions: raise print(f'ERROR: {e}') @plugin_function('mds.list.dynamicGroups') def list_dynamic_groups(compartment_id=None, config=None, raise_exceptions=False, return_formatted=True): """Lists dynamic groups Lists all dynamic groups of the compartment Args: compartment_id (str): An OCID of the compartment to use. config (object): An OCI config object or None. raise_exceptions (bool): If set to true exceptions are raised. return_formatted (bool): If set to true a list object is returned. Returns: A list of groups """ # Get the current config try: config = configuration.get_current_config(config=config) import oci.identity # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the groups data = identity.list_dynamic_groups( compartment_id=config.get("tenancy")).data if return_formatted: return format_dynamic_group_listing(data) else: return oci.util.to_dict(data) except oci.exceptions.ServiceError as e: if raise_exceptions: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if raise_exceptions: raise print(f'ERROR: {e}') return @plugin_function('mds.get.dynamicGroup') def get_dynamic_group(name=None, dynamic_group_id=None, config=None): """Get dynamic group Gets a dynamic group Args: name (str): The name of the dynamic group dynamic_group_id (str): The OCID of the dynamic group config (object): An OCI config object or None Returns: The user object or None """ # Get the current config try: config = configuration.get_current_config(config=config) import oci.identity import mysqlsh # Initialize the identity client identity = core.get_oci_identity_client(config=config) # If an dynamic_group_id is given, look up the dynamic_group directly if dynamic_group_id is not None: data = identity.get_dynamic_group( dynamic_group_id=dynamic_group_id).data return data # List the groups data = identity.list_dynamic_groups( compartment_id=config.get("tenancy")).data # Filter out all deleted compartments data = [u for u in data if u.lifecycle_state != "DELETED"] # If an name was given not given, print the list if name is None: group_list = format_dynamic_group_listing(data) print(f"Groups:\n{group_list}") # Let the user choose from the list group = core.prompt_for_list_item( item_list=data, prompt_caption=("Please enter the name or index " "of the dynamic group: "), item_name_property="name", given_value=name) return group except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return @plugin_function('mds.get.dynamicGroupId') def get_dynamic_group_id(name=None, config=None): """Get group id Args: name (str): The name of the group. config (object): An OCI config object or None. Returns: The OCID of the user """ # Get the current config try: config = configuration.get_current_config(config=config) # Get group group = get_dynamic_group(name=name, config=config) return None if group is None else group.id except ValueError as e: print(e) return @plugin_function('mds.delete.dynamicGroup') def delete_dynamic_group(name=None, dynamic_group_id=None, config=None, interactive=True): """Deletes a user Args: name (str): The name of the group. dynamic_group_id (str): The OCID of the group. config (object): An OCI config object or None. interactive (bool): If set to false, function returns true on success Returns: None """ # Get the current config try: config = configuration.get_current_config(config=config) except ValueError as e: print(e) return import oci.identity import re import mysqlsh # Get user group = get_dynamic_group( name=name, dynamic_group_id=dynamic_group_id, config=config) if group is None: print("The group was not found.") return if interactive: # Prompt the user for confirmation prompt = mysqlsh.globals.shell.prompt( f"Are you sure you want to delete the dynamic group '{group.name}' " f"[yes/NO]: ", {'defaultValue': 'no'}).strip().lower() if prompt != "yes": print("Deletion aborted.\n") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) try: identity.delete_dynamic_group(group.id) except oci.exceptions.ServiceError as e: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: print(f'ERROR: {e}') return if interactive: print("Dynamic group deleted.") else: return True