mds_plugin/compartment.py (612 lines of code) (raw):

# Copyright (c) 2021, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Sub-Module for supporting OCI Compartments""" from mysqlsh.plugin_manager import plugin_function from mds_plugin import core, configuration def get_compartment_by_id(compartment_id, config, interactive=True): """Returns a compartment object for the given id Args: compartment_id (str): OCID of the compartment. config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised. Returns: The compartment or tenancy object with the given id """ try: # Get the right config, either the one passed in or, if that one is None, # the global one config = configuration.get_current_config(config=config) import oci.identity # Initialize the identity client identity = core.get_oci_identity_client(config=config) if compartment_id == config.get("tenancy"): comp = identity.get_tenancy(config.get("tenancy")).data else: comp = identity.get_compartment(compartment_id).data return comp except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if not interactive: raise print(f'ERROR: {e}') return def get_compartment_id_by_path(compartment_path, compartment_id=None, config=None): """Gets a compartment by path Args: compartment_path (str): The path, either absolute starting with / or . or .. current_id (str): OCID of the parent compartment. config (object): An OCI config object or None. Returns: The OCID of the matching compartment """ compartment = get_compartment_by_path( compartment_path=compartment_path, compartment_id=compartment_id, config=config) return None if compartment is None else compartment.id def get_compartment_by_path(compartment_path, compartment_id=None, config=None, interactive=True): """Gets a compartment by path Args: compartment_path (str): The path, either absolute starting with / or . or .. current_id (str): OCID of the parent compartment. config (object): An OCI config object or None. Returns: The matching compartment object """ # Get the active config and compartment try: config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) import oci.identity import mysqlsh # If / was given, return the OCID of the tenancy if compartment_path == '/': return get_compartment_by_id( compartment_id=config.get('tenancy'), config=config) # If . was given, return the id of the current compartment if compartment_path == '.': return get_compartment_by_id( compartment_id=compartment_id, config=config) # Initialize the identity client identity = core.get_oci_identity_client(config=config) # If .. was given, return the parent compartment's or the tenancy itself # if the current compartment is the tenancy if compartment_path == '..': comp = get_compartment_by_id( compartment_id=compartment_id, config=config) if type(comp) == oci.identity.models.Tenancy: return comp else: return get_compartment_by_id( compartment_id=comp.compartment_id, config=config) elif compartment_path.startswith("/"): # Lookup full path in the full compartment tree list data = identity.list_compartments( compartment_id=config.get("tenancy"), compartment_id_in_subtree=True).data # Filter out all deleted compartments data = [c for c in data if c.lifecycle_state != "DELETED"] path_items = compartment_path[1:].lower().split("/") full_path = "" i = 0 parent_id = config.get("tenancy") # Walk the given compartment_path and try to find matching path while i < len(path_items): for c in data: if c.compartment_id == parent_id and \ c.name.lower() == path_items[i]: full_path = f"{full_path}/{c.name.lower()}" if full_path == compartment_path.lower(): return c parent_id = c.id break i += 1 else: # Lookup name in the current compartment's list of sub-compartments # List the compartments. If no compartment_id was given, take the id # from the tenancy and display full subtree of compartments data = identity.list_compartments( compartment_id=compartment_id, compartment_id_in_subtree=False).data # Filter out all deleted compartments data = [c for c in data if c.lifecycle_state != "DELETED"] for c in data: if c.name.lower() == compartment_path.lower(): return c return None except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return def get_parent_compartment(compartment, config): """Returns the parent compartment object for the given compartment Args: compartment (object): A compartment object. config (object): An OCI config object or None. Returns: The parent compartment object or None if the compartment is a tenancy """ import oci.identity if compartment is None or compartment.id == config.get('tenancy'): return None else: # Initialize the identity client identity = core.get_oci_identity_client(config=config) parent_comp = identity.get_compartment( compartment.compartment_id).data return parent_comp def get_compartment_full_path(compartment_id, config, interactive=True): """Returns the full path of a given compartment id Path seperator is /. Args: compartment_id (str): A OCID of the compartment. config (object): An OCI config object or None. interactive (bool): If set to false exceptions are raised. Returns: The full path of the compartment """ import oci.identity import re # If the given compartment_id is the OCID of the tenancy return / if not compartment_id or compartment_id == config.get('tenancy'): return "/" else: # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Get the full compartment tree item list of the tenancy try: comp_list = identity.list_compartments( compartment_id=config.get('tenancy'), compartment_id_in_subtree=True).data except oci.exceptions.ServiceError as e: if not interactive: raise print("Could not list all compartments.\n" f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return full_path = "" comp_id = compartment_id # repeat until the tenancy is reached while comp_id and comp_id != config.get('tenancy'): # look through complete tree item list id_not_found = True for c in comp_list: # if comp_id is found, insert comp into full_path and set # comp_id to parent if c.id == comp_id: name = re.sub(r'[\n\r]', ' ', c.name[:22] + '..' if len(c.name) > 24 else c.name) full_path = f"/{name}{full_path}" comp_id = c.compartment_id id_not_found = False break if id_not_found: if not interactive: raise ValueError( f"Compartment with id {comp_id} not found.") print(f"ERROR: Compartment with id {comp_id} not found.") break return full_path def format_compartment_listing(data, current_compartment_id=None): """Returns a formatted list of compartments. If compartment_id is given, the current and parent compartment will be listed as well. Args: data (list): A list of compartment objects. Returns: The formatted list as string """ import re out = "" # return compartments in READABLE text output i = 1 for c in data: # Shorten to 24 chars max, remove linebreaks name = re.sub(r'[\n\r]', ' ', c.name[:22] + '..' if len(c.name) > 24 else c.name) # Shorten to 54 chars max, remove linebreaks description = re.sub(r'[\n\r]', ' ', c.description[:52] + '..' if len(c.description) > 54 else c.description) index = f"*{i:>3}" if current_compartment_id == c.id else f"{i:>4}" out += f"{index} {name:24} {description:54} {c.lifecycle_state}\n" i += 1 return out def format_availability_domain_listing(data): """Returns a formatted list of availability domains Args: data (list): A list of availability domain objects. Returns: The formatted list as string """ import re out = "" # return compartments in READABLE text output i = 1 for a in data: # Shorten to 24 chars max, remove linebreaks name = re.sub(r'[\n\r]', ' ', a.name[:62] + '..' if len(a.name) > 64 else a.name) out += f"{i:>4} {name:64}\n" i += 1 return out def format_availability_domain(items) -> str: """Formats a given list of objects in a human readable form Args: items: Either a list of objects or a single object Returns: The objects formatted as str """ # If a single db_system was given, wrap it in a list if not type(items) is list: items = [items] # return objects in READABLE text output out = "" for i in items: out += core.fixed_len(i.name, 32, '\n', True) return out @plugin_function('mds.get.availabilityDomain', shell=True, cli=True, web=True) def get_availability_domain(**kwargs): """Returns the name of a randomly chosen availability_domain If a name is given, it will be checked if that name actually exists in the current compartment Args: **kwargs: Additional options Keyword Args: availability_domain (str): The name of the availability_domain. random_selection (bool): Whether a random selection should be made compartment_id (str): OCID of the parent compartment. config (dict): An OCI config object or None config_profile (str): The name of an OCI config profile interactive (bool): Indicates whether to execute in interactive mode raise_exceptions (bool): If true exceptions are raised return_formatted (bool): If true a human readable string is returned return_python_object (bool): Used for internal plugin calls Returns: The availability_domain """ availability_domain = kwargs.get("availability_domain") random_selection = kwargs.get("random_selection", True) compartment_id = kwargs.get("compartment_id") config = kwargs.get("config") config_profile = kwargs.get("config_profile") interactive = kwargs.get("interactive", core.get_interactive_default()) raise_exceptions = kwargs.get("raise_exceptions", not interactive) return_formatted = kwargs.get("return_formatted", interactive) return_python_object = kwargs.get("return_python_object", False) try: # Get the active config and compartment config = configuration.get_current_config( config=config, config_profile=config_profile, interactive=interactive) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) import oci.identity import oci.exceptions import random try: # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Get availability domains availability_domains = identity.list_availability_domains( compartment_id).data # Lookup id and proper name of availability_domain availability_domain_obj = None if availability_domain: for ad in availability_domains: if ad.name.lower() == availability_domain.lower(): availability_domain_obj = ad break if not availability_domain_obj or \ availability_domain_obj.name == "": raise Exception( f"The given availability domain {availability_domain} " "was not found.") # If the user did not specify a availability_domain use a random one if not availability_domain_obj: if len(availability_domains) == 0: raise Exception("No availability domain available.") elif len(availability_domains) == 1: availability_domain_obj = availability_domains[0] elif random_selection: index = random.randrange(len(availability_domains)) availability_domain_obj = availability_domains[index] elif interactive: availability_domain_obj = core.prompt_for_list_item( item_list=availability_domains, prompt_caption=( "Please enter the name or index of the " "availability domain: "), item_name_property="name", print_list=True) if not availability_domain_obj: raise Exception("No availability domain specified.") return core.return_oci_object( oci_object=availability_domain_obj, return_formatted=return_formatted, return_python_object=return_python_object, format_function=format_availability_domain) except oci.exceptions.ServiceError as e: if e.code == "NotAuthorizedOrNotFound": error = (f'You do not have privileges to list the ' f'availabilitydomains for this compartment.\n') else: error = (f'Could not list the availability domains for this ' f'compartment.\n') error += f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})' if raise_exceptions: raise Exception(error) print(error) except Exception as e: if raise_exceptions: raise print(f'Could not list the availability domains for this ' f'compartment.\nERROR: {str(e)}') @plugin_function('mds.list.compartments', shell=True, cli=True, web=True) def list_compartments(**kwargs): """Lists compartments This function will list all sub-compartments of the compartment with the given compartment_id. If compartment_id is omitted, all compartments of the tenancy are listed. Args: **kwargs: Optional parameters Keyword Args: compartment_id (str): OCID of the parent compartment include_tenancy (bool): Whether to include the tenancy as compartment config (dict): An OCI config object or None config_profile (str): The name of an OCI config profile interactive (bool): Indicates whether to execute in interactive mode raise_exceptions (bool): If set to true exceptions are raised return_formatted (bool): If set to true, a list object is returned Returns: A list of dicts representing the compartments """ compartment_id = kwargs.get("compartment_id") include_tenancy = kwargs.get("include_tenancy", compartment_id is None) config = kwargs.get("config") config_profile = kwargs.get("config_profile") interactive = kwargs.get("interactive", core.get_interactive_default()) raise_exceptions = kwargs.get("raise_exceptions", not interactive) return_formatted = kwargs.get("return_formatted", interactive) try: config = configuration.get_current_config( config=config, config_profile=config_profile, interactive=interactive) import oci.identity import oci.util import oci.pagination # If no compartment_id is given, return full subtree of the tenancy if compartment_id is None: full_subtree = True compartment_id = config["tenancy"] else: full_subtree = False # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the compartments data = oci.pagination.list_call_get_all_results( identity.list_compartments, compartment_id=compartment_id, access_level="ANY", compartment_id_in_subtree=full_subtree, limit=1000).data current_compartment_id = configuration.get_current_compartment_id( profile_name=config_profile) # Filter out all deleted compartments data = [c for c in data if c.lifecycle_state != "DELETED"] if include_tenancy: tenancy = oci.identity.models.Compartment( id=config["tenancy"], name="/ (Root Compartment)", description="The tenancy is the root compartment.", freeform_tags={}, time_created="2020-01-01T00:00:00.000000+00:00" ) tenancy.lifecycle_state = "ACTIVE" tenancy.defined_tags = {} data.append(tenancy) if return_formatted: return format_compartment_listing( data, current_compartment_id=current_compartment_id) else: compartments = oci.util.to_dict(data) for compartment in compartments: compartment['is_current'] = ( compartment['id'] == current_compartment_id) return compartments except oci.exceptions.ServiceError as e: if raise_exceptions: raise else: print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') except Exception as e: if raise_exceptions: raise else: print(f'ERROR: {e}') @plugin_function('mds.get.compartmentById', shell=True, cli=True, web=True) def get_compartment_by_id_func(compartment_id, **kwargs): """Gets a compartment by id Args: compartment_id (str): OCID of the compartment **kwargs: Optional parameters Keyword Args: config (object): An OCI config object or None. config_profile (str): The name of an OCI config profile interactive (bool): Whether exceptions are raised Returns: The compartment object """ config = kwargs.get("config") config_profile = kwargs.get("config_profile") interactive = kwargs.get("interactive", core.get_interactive_default()) raise_exceptions = kwargs.get("raise_exceptions", not interactive) return_formatted = kwargs.get("return_formatted", interactive) # Get the active config and compartment try: config = configuration.get_current_config( config=config, config_profile=config_profile, interactive=interactive) import oci.util compartment = get_compartment_by_id( compartment_id=compartment_id, config=config, interactive=interactive) if return_formatted: return format_compartment_listing(data=[compartment]) else: # return compartments in JSON text output return oci.util.to_dict(compartment) except oci.exceptions.ServiceError as e: if raise_exceptions: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if raise_exceptions: raise print(f'ERROR: {e}') @plugin_function('mds.get.compartment', shell=True, cli=True, web=True) def get_compartment(compartment_path=None, **kwargs): """Gets a compartment by path If the path was not specified or does not match an existing compartment, show the user a list to select a compartment Args: compartment_path (str): The name of the compartment **kwargs: Optional parameters Keyword Args: parent_compartment_id (str): OCID of the parent compartment config (object): An OCI config object or None. config_profile (str): The name of an OCI config profile interactive (bool): Whether exceptions are raised Returns: The compartment object """ compartment_id = kwargs.get("compartment_id") parent_compartment_id = kwargs.get("parent_compartment_id") config = kwargs.get("config") config_profile = kwargs.get("config_profile") interactive = kwargs.get("interactive", core.get_interactive_default()) raise_exceptions = kwargs.get("raise_exceptions", not interactive) return_formatted = kwargs.get("return_formatted", interactive) # Get the active config and compartment try: config = configuration.get_current_config( config=config, config_profile=config_profile, interactive=interactive) compartment_id = configuration.get_current_compartment_id( compartment_id=parent_compartment_id, config=config) except ValueError as e: if raise_exceptions: raise print(f"ERROR: {str(e)}") return import oci.identity import mysqlsh # Start with an empty compartment compartment = None # If a compartment path was given, look it up if compartment_path is not None: compartment = get_compartment_by_path( compartment_path, compartment_id, config) if compartment is not None: return compartment else: print( f"The compartment with the path {compartment_path} was not found.") if not interactive: return print("Please select another compartment.\n") # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List the compartments. If no compartment_id was given, take the id from # the tenancy and display full subtree of compartments data = identity.list_compartments( compartment_id=compartment_id, compartment_id_in_subtree=False).data # Filter out all deleted compartments data = [c for c in data if c.lifecycle_state != "DELETED"] # Print special path descriptions full_path = get_compartment_full_path(compartment_id, config) is_tenancy = compartment_id == config.get('tenancy') print(f"Directory of compartment {full_path}" f"{' (the tenancy)' if is_tenancy else ''}\n") if compartment_id != config.get("tenancy"): print(" / Root compartment (the tenancy)\n" " . Current compartment\n" " .. Parent compartment\n") # If the compartment_name was not given or found, print out the list comp_list = format_compartment_listing(data) if comp_list == "": comp_list = "Child Compartments:\n - None\n" print(comp_list) # Let the user choose from the list while compartment is None: # Prompt the user for specifying a compartment prompt = mysqlsh.globals.shell.prompt( "Please enter the name, index or path of the compartment: ", {'defaultValue': ''}).strip().lower() if prompt == "": return try: if prompt.startswith('/'): if prompt == '/': # The compartment will be the tenancy compartment = get_compartment_by_id( compartment_id=config.get('tenancy'), config=config) continue else: # Get the compartment by full path compartment = get_compartment_by_path( compartment_path=prompt, compartment_id=compartment_id, config=config) if compartment is None: raise ValueError continue elif prompt == '.': # The compartment will be the current compartment compartment = get_compartment_by_id( compartment_id=compartment_id, config=config) continue elif prompt == '..': # The compartment will be the parent compartment or none compartment = get_parent_compartment( get_compartment_by_id( compartment_id=compartment_id, config=config), config=config) if compartment is None: raise ValueError continue try: # If the user provided an index, try to map that to a compartment nr = int(prompt) if nr > 0 and nr <= len(data): compartment = data[nr - 1] else: raise IndexError except ValueError: # Search by name for c in data: if c.name.lower() == prompt: compartment = c break if compartment is None: raise ValueError except ValueError: print( f'The compartment {prompt} was not found. Please try again.\n') except IndexError: print(f'A compartment with the index {prompt} was not found. ' f'Please try again.\n') return compartment @plugin_function('mds.get.compartmentId') def get_compartment_id(compartment_path=None, **kwargs): """Gets a compartment OCID by path If the path was not specified or does not match an existing compartment, show the user a list to select a compartment Args: compartment_path (str): The name of the compartment **kwargs: Optional parameters Keyword Args: parent_compartment_id (str): OCID of the parent compartment. config (object): An OCI config object or None. Returns: The OCID of a compartment """ compartment = get_compartment( compartment_path=compartment_path, parent_compartment_id=kwargs.get("parent_compartment_id"), config=kwargs.get("config")) return None if compartment is None else compartment.id @plugin_function('mds.create.compartment') def create_compartment(**kwargs): """Creates a new compartment This function will create a new compartment. Args: **kwargs: Optional parameters Keyword Args: name (str): The name used for the new compartment. description (str): A description used for the new compartment. parent_compartment_id (str): OCID of the parent compartment. config (object): An OCI config object or None. return_object (bool): Whether to return the object interactive (bool): Whether exceptions are raised Returns: The new compartment if return_object is set to true """ name = kwargs.get("name") description = kwargs.get("description") parent_compartment_id = kwargs.get("parent_compartment_id") config = kwargs.get("config") return_object = kwargs.get("return_object", False) interactive = kwargs.get("interactive", True) # Get the active config and compartment try: config = configuration.get_current_config(config=config) parent_compartment_id = configuration.get_current_compartment_id( compartment_id=parent_compartment_id, config=config) import oci.identity import mysqlsh # Get a name if name is None and interactive: name = mysqlsh.globals.shell.prompt( "Please enter the name for the new compartment: ", {'defaultValue': ''}).strip() if name == "": print("Operation cancelled.") return # Get a description if description is None and interactive: description = mysqlsh.globals.shell.prompt( "Please enter a description for the new compartment [-]: ", {'defaultValue': '-'}).strip() if description == "": print("Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Setup the compartment details compartment_details = oci.identity.models.CreateCompartmentDetails( compartment_id=parent_compartment_id, name=name, description=description if description is not None else "-" ) # Create the compartment compartment = identity.create_compartment(compartment_details).data print(f"Compartment {name} is being created.\n") if return_object: return compartment except oci.exceptions.ServiceError as e: if interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if interactive: raise print(f'ERROR: {e}') @plugin_function('mds.delete.compartment') def delete_compartment(compartment_path=None, **kwargs): """Deletes the compartment with the given id If no id is given, it will prompt the user for the id. Args: compartment_path (str): The full path to the compartment **kwargs: Optional parameters Keyword Args: compartment_id (str): OCID of the compartment. config (object): An OCI config object or None. interactive (bool): Whether exceptions are raised Returns: None """ compartment_id = kwargs.get("compartment_id") config = kwargs.get("config") interactive = kwargs.get("interactive") # Get the active config and compartment try: config = configuration.get_current_config(config=config) import oci.identity import mysqlsh compartment = get_compartment( compartment_path=compartment_path, compartment_id=compartment_id, config=config, interactive=interactive) if compartment is None: print("Operation cancelled.") return False # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Prompt the user for specifying a compartment if interactive: prompt = mysqlsh.globals.shell.prompt( f"Are you sure you want to delete the compartment {compartment.name} " f"[yes/NO]: ", {'defaultValue': 'no'}).strip().lower() if prompt != "yes": print("Deletion aborted.\n") return False identity.delete_compartment(compartment.id) print(f"Compartment {compartment.name} is being deleted.") return True except oci.exceptions.ServiceError as e: if interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return False except (ValueError, oci.exceptions.ClientError) as e: if interactive: raise print(f'ERROR: {e}') return False @plugin_function('mds.update.compartment') def update_compartment(compartment_path=None, **kwargs): """Updates the compartment with the given id If no id is given, it will prompt the user for the id. Args: compartment_path (str): The name or path of the compartment to update. **kwargs: Optional parameters Keyword Args: name (str): The new name of the compartment. description (str): The new description of the compartment. compartment_id (str): OCID of the compartment. config (object): An OCI config object or None. interactive (bool): Whether exceptions are raised Returns: None """ name = kwargs.get("name") description = kwargs.get("description") compartment_id = kwargs.get("compartment_id") config = kwargs.get("config") interactive = kwargs.get("interactive", True) # Get the active config and compartment try: config = configuration.get_current_config(config=config) import oci.identity import mysqlsh # If a compartment_path was given, look the compartment up if compartment_path is not None and compartment_path != "": compartment = get_compartment_by_path(compartment_path, config) elif compartment_id is not None: # If the compartment_id was given, use that compartment = get_compartment_by_id(compartment_id, config) else: # Otherwise ask the user to select a compartment print("Please specify the compartment that should be updated.\n") try: compartment = get_compartment( compartment_id=compartment_id, config=config) except Exception as e: print(f"ERROR: Could not get compartment_id. {e}") return if compartment is None: print("Operation cancelled.") return # Initialize the identity client identity = core.get_oci_identity_client(config=config) # Prompt the user for the new values if name is None: name = mysqlsh.globals.shell.prompt( f"Please enter a new name for the compartment [{compartment.name}]: ", {'defaultValue': compartment.name}).strip() if description is None: description = mysqlsh.globals.shell.prompt( f"Please enter a new description [current description]: ", {'defaultValue': compartment.description}).strip() update_details = oci.identity.models.UpdateCompartmentDetails( name=name, description=description ) identity.update_compartment(compartment.id, update_details) print(f"Compartment {compartment.name} is being updated.") except oci.exceptions.ServiceError as e: if interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except (ValueError, oci.exceptions.ClientError) as e: if interactive: raise print(f'ERROR: {e}') @plugin_function('mds.list.availabilityDomains') def list_availability_domains(**kwargs): """Lists Availability Domains This function will list all Availability Domains of the given compartment Args: **kwargs: Optional parameters Keyword Args: compartment_id (str): OCID of the parent compartment. config (dict): An OCI config object or None. interactive (bool): If set to false exceptions are raised return_formatted (bool): If set to true, a list object is returned. Returns: A list of Availability Domains """ compartment_id = kwargs.get("compartment_id") config = kwargs.get("config") interactive = kwargs.get("interactive", True) return_formatted = kwargs.get("return_formatted", True) try: # Get the active config and compartment config = configuration.get_current_config(config=config) compartment_id = configuration.get_current_compartment_id( compartment_id=compartment_id, config=config) import oci.identity import oci.util # Initialize the identity client identity = core.get_oci_identity_client(config=config) # List availability domains data = identity.list_availability_domains( compartment_id).data if return_formatted: return format_availability_domain_listing(data) else: return oci.util.to_dict(data) except oci.exceptions.ServiceError as e: if not interactive: raise print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})') return except Exception as e: if not interactive: raise print(f'ERROR: {e}') return