mds_plugin/configuration.py (1,066 lines of code) (raw):
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module for supporting OCI config profile handling and current objects
OCI Configuration profiles are stored in the ~/.oci/config file by default.
OCIDs of the object used as current ones are stored in the cli_rc_file
which is stored in the ~/.oci/oci_cli_rc file by default.
When using the MDS plugin API the OCI config selected by the user is stored
in getattr(mysqlsh.globals, 'mds_config')
"""
# cSpell:ignore saopaulo, Paulo, chuncheon, Vinhedo
from mysqlsh.plugin_manager import plugin_function
from mds_plugin import core
from os import getenv
OCI_REGION_LIST = [
{"name": "Australia East (Sydney)", "id": "ap-sydney-1", "location": "Sydney, Australia",
"region_key": "SYD", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Australia Southeast (Melbourne)", "id": "ap-melbourne-1", "location": "Melbourne, Australia",
"region_key": "MEL", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Brazil East (Sao Paulo)", "id": "sa-saopaulo-1", "location": "Sao Paulo, Brazil",
"region_key": "GRU", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Brazil Southeast (Vinhedo)", "id": "sa-vinhedo-1", "location": "Vinhedo, Brazil",
"region_key": "VCP", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Canada Southeast (Montreal)", "id": "ca-montreal-1", "location": "Montreal, Canada",
"region_key": "YUL", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Canada Southeast (Toronto)", "id": "ca-toronto-1", "location": "Toronto, Canada",
"region_key": "YYZ", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Chile (Santiago)", "id": "sa-santiago-1", "location": "Santiago, Chile",
"region_key": "SCL", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Germany Central (Frankfurt)", "id": "eu-frankfurt-1", "location": "Frankfurt, Germany",
"region_key": "FRA", "key_realm": "OC1", "avail_domains": "3"},
{"name": "India South (Hyderabad)", "id": "ap-hyderabad-1", "location": "Hyderabad, India",
"region_key": "HYD", "key_realm": "OC1", "avail_domains": "1"},
{"name": "India West (Mumbai)", "id": "ap-mumbai-1", "location": "Mumbai, India",
"region_key": "BOM", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Japan Central (Osaka)", "id": "ap-osaka-1", "location": "Osaka, Japan",
"region_key": "KIX", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Japan East (Tokyo)", "id": "ap-tokyo-1", "location": "Tokyo, Japan",
"region_key": "NRT", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Netherlands Northwest (Amsterdam)", "id": "eu-amsterdam-1", "location": "Amsterdam, Netherlands",
"region_key": "AMS", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Saudi Arabia West (Jeddah)", "id": "me-jeddah-1", "location": "Jeddah, Saudi Arabia",
"region_key": "JED", "key_realm": "OC1", "avail_domains": "1"},
{"name": "South Korea Central (Seoul)", "id": "ap-seoul-1", "location": "Seoul, South Korea",
"region_key": "ICN", "key_realm": "OC1", "avail_domains": "1"},
{"name": "South Korea North (Chuncheon)", "id": "ap-chuncheon-1", "location": "Chuncheon, South Korea",
"region_key": "YNY", "key_realm": "OC1", "avail_domains": "1"},
{"name": "Switzerland North (Zurich)", "id": "eu-zurich-1", "location": "Zurich, Switzerland",
"region_key": "ZRH", "key_realm": "OC1", "avail_domains": "1"},
{"name": "UAE East (Dubai)", "id": "me-dubai-1", "location": "Dubai, UAE",
"region_key": "DXB", "key_realm": "OC1", "avail_domains": "1"},
{"name": "UK South (London)", "id": "uk-london-1", "location": "London, United Kingdom",
"region_key": "LHR", "key_realm": "OC1", "avail_domains": "3"},
{"name": "UK West (Newport)", "id": "uk-cardiff-1", "location": "Newport, United Kingdom",
"region_key": "CWL", "key_realm": "OC1", "avail_domains": "1"},
{"name": "US East (Ashburn)", "id": "us-ashburn-1", "location": "Ashburn, VA",
"region_key": "IAD", "key_realm": "OC1", "avail_domains": "3"},
{"name": "US West (Phoenix)", "id": "us-phoenix-1", "location": "Phoenix, AZ",
"region_key": "PHX", "key_realm": "OC1", "avail_domains": "3"},
{"name": "US West (San Jose)", "id": "us-sanjose-1", "location": "San Jose, CA",
"region_key": "SJC", "key_realm": "OC1", "avail_domains": "1"}
]
def get_default_config_file_path():
import oci.config
config_file_path = getenv("MYSQLSH_OCI_CONFIG_FILE")
if config_file_path is None:
config_file_path = oci.config.DEFAULT_LOCATION
return config_file_path
@plugin_function('mds.get.regions', shell=True, cli=True, web=True)
def get_regions():
"""Returns the list of available OCI regions
Returns:
A list of dicts
"""
return OCI_REGION_LIST
def get_config(profile_name=None, config_file_path=None,
cli_rc_file_path=None, interactive=True,
raise_exceptions=False):
"""Loads an oci config
This function loads an oci config
Args:
profile (str): The name of the OCI profile
config_file_path (str): The file location of the OCI config file
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether to raise exceptions
Returns:
The config dict or None
"""
import oci.config
import oci.auth
import oci.signer
import oci.exceptions
import mysqlsh
# If no profile is given, look it up in the CLI config file
if profile_name is None:
default_profile_name = get_default_profile()
profile_name = default_profile_name \
if default_profile_name else "DEFAULT"
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
if config_file_path is None:
config_file_path = get_default_config_file_path()
# cSpell:ignore instanceprincipal
# If the profile_name matches instanceprincipal, use an Instance Principals
# instead of an actual config
set_global_config = False
if profile_name.lower() == "instanceprincipal":
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
config = {
"signer": signer,
"tenancy": signer.tenancy_id,
"region": signer.initialize_and_return_region()}
else:
# Load config from file
try:
config = oci.config.from_file(
file_location=config_file_path, profile_name=profile_name)
if profile_name == "DEFAULT" and config.get('tenancy') == None:
profile_name = None
raise oci.exceptions.ProfileNotFound()
oci.config.validate_config(config)
# If running in interactive mode and there is no global
# config set yet, ensure it gets set
if interactive and not 'mds_config' in dir(mysqlsh.globals):
set_global_config = True
except (oci.exceptions.ConfigFileNotFound,
oci.exceptions.ProfileNotFound) as e:
if raise_exceptions:
raise
if interactive:
if profile_name:
print(f"OCI configuration profile '{profile_name}' "
"was not found.")
else:
print("OCI configuration profile not found.")
# Start wizard for new profiles
config = create_config(
profile_name=profile_name,
config_file_path=config_file_path)
if config is None:
return
set_global_config = True
except oci.exceptions.InvalidConfig as e:
raise ValueError(f"The configuration profile '{profile_name}' is "
f"invalid.\n{str(e)}")
# Create default signer based on the config values. This is required
# since the OCI clients use if 'signer' in kwargs: and don't check for
# None
try:
config["signer"] = oci.signer.Signer(
tenancy=config.get("tenancy"),
user=config.get("user"),
fingerprint=config.get("fingerprint"),
private_key_file_location=config.get("key_file"),
pass_phrase=config.get("pass_phrase"),
private_key_content=config.get("key_content"))
except oci.exceptions.MissingPrivateKeyPassphrase:
# If there is a passphrase required, ask for it
pass_phrase = mysqlsh.globals.shell.prompt(
f"Please enter the passphrase for the API key: ",
{'defaultValue': '', 'type': 'password'})
if not pass_phrase:
if raise_exceptions:
raise Exception("No or invalid passphrase for API key.")
return None
# Store the passphrase in the config
config["pass_phrase"] = pass_phrase
try:
config["signer"] = oci.signer.Signer(
tenancy=config.get("tenancy"),
user=config.get("user"),
fingerprint=config.get("fingerprint"),
private_key_file_location=config.get("key_file"),
pass_phrase=config.get("pass_phrase"),
private_key_content=config.get("key_content"))
except oci.exceptions.MissingPrivateKeyPassphrase:
if raise_exceptions:
raise Exception("No or invalid passphrase for API key.")
print("No or invalid passphrase for API key.")
return None
# Set additional config values like profile and current objects
# Add profile name to the config so it can be used later
config["profile"] = profile_name
# Set it as global config object
if set_global_config:
setattr(mysqlsh.globals, 'mds_config', config)
# Load current compartment_id
current_compartment_id = get_current_compartment_id(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
# If there is no current compartment yet, use the tenancy
if not current_compartment_id:
if "tenancy" in config:
current_compartment_id = config["tenancy"]
# Initialize current compartment in the global config
config["compartment-id"] = current_compartment_id
# Load current instance_id
current_instance_id = get_current_instance_id(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_instance_id is not None:
# Initialize current compartment in the global config
config["instance-id"] = current_instance_id
# Load current db_system_id
current_db_system_id = get_current_db_system_id(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_db_system_id is not None:
# Initialize current compartment in the global config
config["db-system-id"] = current_db_system_id
# Load current bucket_name
current_bucket_name = get_current_bucket_name(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_bucket_name is not None:
# Initialize current compartment in the global config
config["bucket-name"] = current_bucket_name
# Load current network
current_network_id = get_current_network_id(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_network_id is not None:
# Initialize current compartment in the global config
config["network-id"] = current_network_id
# Load current endpoint
current_endpoint = get_current_endpoint(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_endpoint is not None:
# Initialize current compartment in the global config
config["endpoint"] = current_endpoint
# Load current bastion_id
current_bastion_id = get_current_bastion_id(
profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_bastion_id:
# Initialize current compartment in the global config
config["bastion-id"] = current_bastion_id
# Update global config object
if set_global_config:
setattr(mysqlsh.globals, 'mds_config', config)
return config
def get_config_profile_dict_from_parser(config, profile):
"""Returns the given profile as a dict
Args:
config (object): The configparser.ConfigParser() instance with the
config loaded
profile (str): The name of the profile
Returns:
The profile as Dict
"""
return {
'profile': profile,
'user': config[profile].get('user', None),
'fingerprint': config[profile]['fingerprint'],
'key_file': config[profile]['key_file'],
'tenancy': config[profile]['tenancy'],
'region': config[profile]['region'],
'is_current': profile == get_default_profile(),
'security_token_file': config[profile].get('security_token_file', None)
}
@plugin_function('mds.list.configProfiles', shell=True, cli=True, web=True)
def list_config_profiles(**kwargs):
"""Lists all available profiles
Args:
**kwargs: Additional options
Keyword Args:
config_file_path (str): The file location of the OCI config file
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
return_formatted (bool): If set to true, a list object is returned
Returns:
None
"""
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
config_file_path = kwargs.get(
"config_file_path", get_default_config_file_path())
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
try:
import configparser
import os.path
# Convert Unix path to Windows
config_file_path = os.path.abspath(
os.path.expanduser(config_file_path))
# Read config
config = configparser.ConfigParser()
config.read(config_file_path)
# Add special handling for the DEFAULT section that is not listed
# in the config.sections() call
config_profiles = [
get_config_profile_dict_from_parser(config, 'DEFAULT')] \
if 'tenancy' in config['DEFAULT'] else []
for section in config.sections():
try:
config_profiles.append(
get_config_profile_dict_from_parser(config, section))
except Exception as e:
raise Exception(
f"The OCI configuration file is corrupted. {e}")
if interactive:
print("The following configuration profiles are available:\n")
if return_formatted:
result = ""
i = 1
for p in config_profiles:
result += f"{i:>4} {p.get('profile'):25} {p.get('region')}\n"
i += 1
return result
else:
return config_profiles
except Exception as e:
if raise_exceptions:
raise
print(f"Error: {str(e)}")
@plugin_function('mds.create.configProfile')
def create_config(**kwargs):
"""Creates and stores a new OCI config
Args:
**kwargs: Optional parameters
Keyword Args:
profile_name (str): The name of the OCI profile
config_file_path (str): The file path of the OCI config file
tenancy (str): OCID of the tenancy
region (str): The name of the region
user_id (str): OCID of the user
key_file (str): The filename of the OCI API key
fingerprint (str): The fingerprint of the OCI API key
interactive (bool): Whether user interaction should be performed
Returns:
The new config or None
If profile_name is omitted, the DEFAULT profile is used.
If the config_file_path is omitted, ~/.oci/config is used.
"""
import os.path
import pathlib
import configparser
import oci.config
import mysqlsh
from mds_plugin import user
profile_name = kwargs.get("profile_name")
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
config_file_path = kwargs.get(
"config_file_path", get_default_config_file_path())
tenancy = kwargs.get("tenancy")
region = kwargs.get("region")
user_id = kwargs.get("user_id")
key_file = kwargs.get("key_file")
fingerprint = kwargs.get("fingerprint")
interactive = kwargs.get("interactive", True)
# Get full path
config_file_path = os.path.expanduser(config_file_path)
if key_file:
key_file = os.path.expanduser(key_file)
parser = configparser.ConfigParser(interpolation=None)
if os.path.isfile(config_file_path):
parser.read(config_file_path)
# If no profile name was given, query the user for one
if profile_name is None and interactive:
profile_name = mysqlsh.globals.shell.prompt(
"Please enter a name for the new OCI profile: ").strip()
if not profile_name:
print("Operation cancelled.")
return None
# Ensure profile_name only uses alphanumeric chars
profile_name = ''.join(e for e in profile_name if e.isalnum())
if profile_name in parser and profile_name.upper() != 'DEFAULT':
print(f"The profile '{profile_name}' already exists.")
return
# cSpell:ignore OCIDs, sdkconfig
print(f"Creating a new OCI API configuration profile '{profile_name}'..."
"\n\nThe creation of a new OCI API configuration profile requires\n"
"the following steps.\n"
"1. Identify the geographical OCI region to use. Each region has\n"
" its own end point and therefore it is important to chose the\n"
" right one for the OCI API \n"
"2. The Oracle Cloud Identifier (OCID) of the OCI tenancy and\n"
" OCI user account need to be looked up on the OCI web console.\n"
"3. An API Signing Key needs to be generated or an existing key file"
"\n needs to be reused.\n"
"4. The API Signing Key needs to be uploaded to the OCI web console."
"\n\nPlease follow the instructions outlined by this wizard or read\n"
"about the details of how to get the recquired OCIDs here:\n"
"- https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/"
"sdkconfig.htm\n")
# Wait for enter
mysqlsh.globals.shell.prompt(
"Please hit the enter key to continue...\n")
if region is None or region == '':
print("OCI Region\n"
"==========\n"
"Please select a region from this list or provide the exact name "
"of a new region.")
region_list = []
for r in OCI_REGION_LIST:
region_list.append(f"{r.get('id'):20} {r.get('location')}")
region = core.prompt_for_list_item(
item_list=region_list,
prompt_caption="Please enter the index or name of a region: ",
prompt_default_value='',
print_list=True)
if not region:
print("Operation cancelled.")
return None
# Remove the trailing caption
region = region.split(' ')[0]
# If no tenancy or user was given, query the user for one
if not tenancy or not user_id:
print("\nOCID of the OCI Tenancy and User Account\n"
"========================================\n"
"Please open the OCI web console by following the link below.\n"
"Enter the name of the tenancy and then log in using your\n"
"OCI user name and web console password or identity provider.\n"
f"- https://console.{region}.oraclecloud.com/\n\n"
"In the OCI web console please click on the user profile icon\n"
"top right and choose 'Tenancy'. On the Tenancy page find the\n"
"<Copy> link next to the Tenancy OCID value and use it to copy\n"
"the value to the clipboard.\n\n"
"For the User OCID value click the user profile icon top right\n"
"and choose 'User Settings'. Again, locate the User OCID value\n"
"click the <Copy> link to copy the value to the clipboard.\n")
if not tenancy:
tenancy = mysqlsh.globals.shell.prompt(
"Please enter the OCID of the tenancy: ").strip()
if not tenancy:
return None
# Check if the user actually entered the tenancy OCID
if not tenancy.startswith('ocid1.tenancy.oc1..'):
# If not, give him another chance to enter the right one
print("ERROR: The OCID of the tenancy needs to start with "
"'ocid1.tenancy.oc1..'.")
tenancy = mysqlsh.globals.shell.prompt(
"Please enter the OCID of the tenancy: ").strip()
if not tenancy:
return None
if not user_id:
user_id = mysqlsh.globals.shell.prompt(
"Please enter the OCID of the user account: ").strip()
if not user_id:
return None
# Check if the user actually entered the user OCID
if not user_id.startswith('ocid1.user.oc1..'):
# If not, give him another chance to enter the right one
print("ERROR: The OCID of the user needs to start with "
"'ocid1.user.oc1..'.")
user_id = mysqlsh.globals.shell.prompt(
"Please enter the OCID of the user account: ").strip()
if not user_id:
return None
def get_public_key_file(key_file):
"""Get the public key based on the private key file path
Try to locate public_key_file by appending _public.pem to the
path + basename of the private key file
Args:
key_file (str): The private key file path
Returns:
The public key file path or None
"""
public_key_file = None
if '.' in key_file:
public_key_file = os.path.splitext(key_file)[0] + "_public.pem"
if not public_key_file or not os.path.isfile(public_key_file):
public_key_file = mysqlsh.globals.shell.prompt(
"Please enter the full path to the private key file "
"[~/.oci/oci_api_key.pem]: ").strip()
if not os.path.isfile(public_key_file):
print(f"Key file '{public_key_file}' not found.\n"
"Operation cancelled.")
return None
return public_key_file
# If no key_file was given, let the user choose
if interactive and (key_file is None or not os.path.isfile(key_file)):
print("\nAPI Signing Key\n"
"===============\n"
"In order to access the OCI APIs an API Signing Key is required.\n"
"The API Signing Key is a special RSA key in PEM format.\n\n"
"Please select one of the following options.")
key_option = core.prompt_for_list_item(
item_list=[
"Create a new API Signing Key",
"Reuse an existing API Signing Key"],
prompt_caption="Please enter the index of the key option to use: ",
prompt_default_value='',
print_list=True)
if key_option == '':
print("Operation cancelled.")
return None
if key_option.startswith('Reuse'):
key_file = mysqlsh.globals.shell.prompt(
"Please enter the full path to the private key file "
"[~/.oci/oci_api_key.pem]: ",
options={
'defaultValue': '~/.oci/oci_api_key.pem'}).strip()
if not key_file:
print("Operation cancelled.")
return None
key_file = os.path.expanduser(key_file)
if not os.path.isfile(key_file):
print(f"Key file '{key_file}' not found. Operation cancelled.")
return None
# Get public key based on key_file
public_key_file = get_public_key_file(key_file)
if not public_key_file:
return None
else:
print("Generating API key files...")
key_info = user.create_user_api_key(
key_path="~/.oci/",
key_file_prefix=f"oci_api_key_{profile_name}",
write_to_disk=True,
return_object=True,
interactive=False)
fingerprint = key_info["fingerprint"]
key_file = os.path.expanduser(
f"~/.oci/oci_api_key_{profile_name}.pem")
public_key_file = os.path.splitext(key_file)[0] + "_public.pem"
if interactive:
print("\nUploading the API Signing Key to the OCI web console\n"
"====================================================\n"
"In the OCI web console please click on the user profile "
"icon\ntop right and choose 'User Settings'.\n\n"
"On the lower left side in the Resources list select "
"'API Keys'.\n"
"Click on [Add Public Key] and choose the public key file \n"
f"\n {public_key_file}\n\n"
"and click [Add] to complete the upload.\n")
# Wait for enter
mysqlsh.globals.shell.prompt(
"Please hit the enter key once the public key file has been "
"uploaded...\n")
if not key_file or not os.path.isfile(key_file):
print("No API Signing Key file given or found. Operation cancelled.")
return None
if not fingerprint:
# Get public key based on key_file
public_key_file = get_public_key_file(key_file)
if not public_key_file:
return None
# Read the public key
with open(public_key_file, mode='rb') as binary_file:
public_key = binary_file.read()
# Generate fingerprint
fingerprint = user.get_fingerprint(public_key)
# Setup Config
config = {
"user": user_id,
"fingerprint": fingerprint,
"key_file": key_file,
"tenancy": tenancy,
"region": region
}
try:
oci.config.validate_config(config)
except oci.exceptions.InvalidConfig as e:
print(f"ERROR: The config is invalid: {str(e)}")
return
# Update config parser
parser[profile_name] = config
print(f"Storing profile '{profile_name}' in config file ...")
# Ensure path exists
pathlib.Path(os.path.dirname(config_file_path)).mkdir(
parents=True, exist_ok=True)
# Write the change to disk
with open(config_file_path, 'w') as configfile:
parser.write(configfile)
# Add profile name and default values
config["profile"] = profile_name
config = {**oci.config.DEFAULT_CONFIG, **config}
# Set it as global config object
setattr(mysqlsh.globals, 'mds_config', config)
# Set the new profile as the default profile
load_profile_as_current(profile_name)
if not interactive:
return config
def get_current_config(config=None, config_profile=None, interactive=None):
"""Gets the active config dict
If no config dict is given as parameter, the global config dict will be used
Args:
config (dict): The config to be used or None
config_profile (str): The name of a config profile
interactive (bool): Indicates whether to execute in interactive mode
Returns:
The active config dict
"""
import mysqlsh
if config_profile:
try:
# Check if the current global config matches the config profile
# name passed in
global_config = None
if 'mds_config' in dir(mysqlsh.globals):
global_config = getattr(mysqlsh.globals, 'mds_config')
if global_config and global_config.get('profile') == config_profile:
config = global_config
else:
# If there is no global config yet or a different config
# profile name was passed in, load that
load_profile_as_current(
profile_name=config_profile,
print_current_objects=False,
interactive=interactive)
# Check if global object 'mds_config' now has been registered
if 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
else:
raise Exception("No OCI configuration set.")
except Exception as e:
raise e
elif config is None:
# Check if global object 'mds_config' has already been registered
if 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
else:
try:
# Load the config profile
load_profile_as_current(
profile_name=config_profile,
print_current_objects=False)
# Check if global object 'mds_config' now has been registered
if 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
else:
raise Exception("No OCI configuration set.")
except Exception as e:
raise ValueError(str(e))
return config
def load_profile_as_current(profile_name=None, config_file_path=None,
print_current_objects=True, interactive=True):
"""Loads an oci config
If profile_name is omitted, the DEFAULT profile is used.
If the config_file_path is omitted, ~/.oci/config is used.
Args:
profile_name (str): The name of the OCI profile or InstancePrincipal
when running on a compute instance that is part of a dynamic group.
config_file_path (str): The file path of the OCI config file
print_current_objects (bool): Whether to print information about current
objects and core cloud functions
interactive (bool): Whether information should be printed
Returns:
None
"""
# cSpell:ignore timeit
from timeit import default_timer as timer
# Take the time it takes to load the OCI module
if interactive:
print("Loading OCI library... ", end="")
start = timer()
import oci.identity
if interactive:
end = timer()
print(f'(Loaded in {end - start:.2f} seconds)')
import mysqlsh
# If the config file exists, load the requested profile_name
try:
if interactive:
print("Loading OCI configuration ...")
try:
delattr(mysqlsh.globals, 'mds_config')
except:
pass
# Try to load config with the given profile name
config = get_config(
profile_name=profile_name, config_file_path=config_file_path,
interactive=interactive, raise_exceptions=not interactive)
if not config:
return
# Set it as global config object
setattr(mysqlsh.globals, 'mds_config', config)
if interactive:
print(f"\nOCI profile '{config.get('profile')}' loaded.\n")
except (oci.exceptions.ConfigFileNotFound,
oci.exceptions.ProfileNotFound,
oci.exceptions.InvalidConfig) as e:
if interactive:
print("OCI configuration profile not found. Starting OCI "
"Configuration Profile Wizard...\n")
# Start wizard for new profiles
# config = configuration.create_config(
# profile_name=profile_name, config_file_path=config_file_path)
# if config is None:
# return
# # Try to load config with the given profile name
# config = configuration.get_config(
# profile_name=profile_name, config_file_path=config_file_path)
# List the current objects
if interactive and print_current_objects:
list_current_objects(config=config, profile_name=profile_name)
@plugin_function('mds.list.currentObjects')
def list_current_objects(config=None, profile_name=None,
cli_rc_file_path=None):
"""Lists the current objects
Args:
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
None
"""
config = get_current_config(config=config)
import oci.exceptions
from mds_plugin import compartment, compute, network
from mds_plugin import mysql_database_service, bastion
# Get current compartment information
try:
comp_id = get_current_compartment_id(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if comp_id == config.get('tenancy'):
# Initialize the identity client
identity = core.get_oci_identity_client(config=config)
# Get tenancy name
comp_name = identity.get_tenancy(config.get("tenancy")).data.name
else:
comp_name = compartment.get_compartment_full_path(
compartment_id=comp_id, config=config)
print(f"Current compartment set to '{comp_name}'.")
except oci.exceptions.ServiceError as e:
if e.status == 404:
set_current_compartment(
compartment_path='/', config=config)
else:
print(f"Could not fetch information about the compartment "
f"'{comp_id}'.\n"
f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
# Get current instance information
try:
instance_id = get_current_instance_id(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if instance_id is not None:
instance = compute.get_instance_by_id(instance_id=instance_id,
config=config)
if instance is None or instance.lifecycle_state == "TERMINATED":
config.pop("instance-id", None)
else:
print(f"Current Compute Instance set to "
f"'{instance.display_name}'.")
except oci.exceptions.ServiceError as e:
if e.status == 404:
# Remove current instance
set_current_instance(
instance_id="", config=config, interactive=False)
else:
print(f"Could not fetch information about the instance {comp_id}.\n"
f'ERROR: {e.message}.\n(Code: {e.code}; Status: {e.status})')
# Get current DB System information
try:
db_system_id = get_current_db_system_id(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if db_system_id is not None:
db_system = mysql_database_service.get_db_system_by_id(
db_system_id=db_system_id, config=config)
if db_system is None:
config.pop("db-system-id", None)
else:
print(f"Current MySQL DB System set to "
f"'{db_system.display_name}'.")
except oci.exceptions.ServiceError as e:
if e.status == 404:
# Remove current db_system
set_current_db_system(
db_system_id='', config=config, interactive=False)
else:
print(f"Could not fetch information about the DB System {comp_id}."
f'\nERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
# TODO check if bucket actually exists
# Get current bucket_name
current_bucket_name = get_current_bucket_name(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if current_bucket_name is not None:
print(f"Current Object Store bucket set to "
f"'{current_bucket_name}'.")
# Get current bastion information
try:
bastion_id = get_current_bastion_id(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if bastion_id is not None:
bastion = bastion.get_bastion(
bastion_id=bastion_id, config=config,
interactive=False, return_type=core.RETURN_OBJ)
if bastion is None or bastion.lifecycle_state == "DELETED":
config.pop("bastion-id", None)
else:
print(f"Current Bastion set to "
f"'{bastion.name}'.")
except oci.exceptions.ServiceError as e:
if e.status == 404:
# Remove current bastion
set_current_bastion(
bastion_id="", config=config, interactive=False)
else:
print(
f"Could not fetch information about the bastion {bastion_id}.\n"
f'ERROR: {e.message}.\n(Code: {e.code}; Status: {e.status})')
# Get current network information
try:
network_id = get_current_network_id(
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
if network_id is not None:
network = network.get_network(
network_id=network_id, config=config)
if network is None or network.lifecycle_state == "DELETED":
config.pop("network-id", None)
else:
print(f"Current Bastion set to "
f"'{bastion.name}'.")
except oci.exceptions.ServiceError as e:
if e.status == 404:
# Remove current instance
set_current_network(
network_id="", config=config, interactive=False)
else:
print(
f"Could not fetch information about the network {network_id}.\n"
f'ERROR: {e.message}.\n(Code: {e.code}; Status: {e.status})')
@plugin_function('mds.set.currentConfigProfile', shell=True, cli=False, web=False)
def set_current_profile(profile_name=None, config_file_path=None,
interactive=True):
"""Sets the current OCI config Profile
If the config_file_path is omitted, the MYSQLSH_OCI_CONFIG_FILE env_var is used when set,
otherwise ~/.oci/config is used.
Args:
profile_name (str): The name of the OCI profile or InstancePrincipal
when running on a compute instance that is part of a dynamic group.
config_file_path (str): The file path of the OCI config file
interactive (bool): Whether information should be printed
Returns:
None
"""
if not profile_name:
import configparser
import os.path
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
if config_file_path is None:
config_file_path = get_default_config_file_path()
# Convert Unix path to Windows
config_file_path = os.path.abspath(
os.path.expanduser(config_file_path))
# Read config
config = configparser.ConfigParser()
config.read(config_file_path)
profile_name = core.prompt_for_list_item(
item_list=config.sections(),
prompt_caption="Please select a config profile to activate it: ",
print_list=True)
if not profile_name:
print("Operation cancelled.")
else:
load_profile_as_current(profile_name=profile_name,
config_file_path=config_file_path)
@plugin_function('mds.set.defaultConfigProfile', cli=True, web=True)
def set_default_profile(profile_name=None,
config_file_path=None,
cli_rc_file_path=None):
"""Sets the default profile
Args:
profile_name (str): The name of the profile currently in use
config_file_path (str): The location of the OCI config file
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
None
"""
import configparser
import os.path
import pathlib
from mds_plugin import general
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
if config_file_path is None:
config_file_path = get_default_config_file_path()
# If no cli_rc_file_path is given, first check the MYSQLSH_OCI_RC_FILE env_var and only then fall back to default
if cli_rc_file_path is None:
cli_rc_file_path = getenv("MYSQLSH_OCI_RC_FILE")
if cli_rc_file_path is None:
cli_rc_file_path = "~/.oci/oci_cli_rc"
# Convert Unix path to Windows
config_file_path = os.path.abspath(
os.path.expanduser(config_file_path))
if not os.path.exists(config_file_path):
# TODO Start the wizard to create a new OCI profile
print(f"No OCI config file found at {config_file_path}")
return
# Read config
configs = configparser.ConfigParser()
if os.path.exists(config_file_path):
configs.read(config_file_path)
# If no profile was given, let the user select one
if profile_name is None or profile_name == "":
print("The following configuration profiles are available:\n")
profile_name = core.prompt_for_list_item(
item_list=configs.sections(),
prompt_caption="Please enter the profile name or index: ",
print_list=True)
if profile_name is None or profile_name == "":
print("Operation cancelled.")
return
# Convert Unix path to Windows
cli_rc_file_path = os.path.abspath(
os.path.expanduser(cli_rc_file_path))
cli_config = configparser.ConfigParser()
# Load CLI config file
if os.path.exists(cli_rc_file_path):
cli_config.read(cli_rc_file_path)
# Ensure that there is a section with the name of profile_name
if "DEFAULT" not in cli_config.sections():
cli_config["DEFAULT"] = {}
# Set the default profile
cli_config["DEFAULT"]["profile"] = profile_name
# Ensure path exists
pathlib.Path(os.path.dirname(cli_rc_file_path)).mkdir(
parents=True, exist_ok=True)
# Write the change to disk
with open(cli_rc_file_path, 'w') as config_file:
cli_config.write(config_file)
# Print out that the current compartment was changed
print(f"Default profile changed to '{profile_name}'.\n")
@plugin_function('mds.get.defaultConfigProfile', shell=True, cli=True, web=True)
def get_default_profile(cli_rc_file_path=None):
"""Gets the default profile if stored in the CLI config file
Args:
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
None
"""
import configparser
import os.path
# If the MYSQLSH_OCI_PROFILE env_var has been set, use this as default
if getenv("MYSQLSH_OCI_PROFILE"):
return getenv("MYSQLSH_OCI_PROFILE")
# If no cli_rc_file_path is given, first check the MYSQLSH_OCI_RC_FILE env_var and only then fall back to default
if cli_rc_file_path is None:
cli_rc_file_path = getenv("MYSQLSH_OCI_RC_FILE")
if cli_rc_file_path is None:
cli_rc_file_path = "~/.oci/oci_cli_rc"
# Convert Unix path to Windows
cli_rc_file_path = os.path.abspath(
os.path.expanduser(cli_rc_file_path))
config = configparser.ConfigParser()
if os.path.exists(cli_rc_file_path):
config.read(cli_rc_file_path)
if "DEFAULT" in config and "profile" in config["DEFAULT"]:
return config["DEFAULT"]["profile"]
else:
return None
@plugin_function('mds.get.currentConfigProfile')
def get_current_profile(profile_name=None):
"""Returns the current profile_name
Args:
profile_name (str): If specified it will be returned as is, otherwise
the profile_name from the current config will be returned
Returns:
None
"""
import mysqlsh
# If profile_name was not given, check if there is a config global
if profile_name is None and 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
# Take the profile from the global config
profile_name = config.get("profile")
return profile_name
def get_current_value(
value_name, passthrough_value=None, config=None,
profile_name=None, cli_rc_file_path=None):
"""Gets the current value
Either from the global config or the OCI CLI config file
Args:
value_name (str): The current value to get
passthrough_value (str): If specified, returned instead of the current
config (dict): The config to be used or None, then global config dict
will be used
profile_name (str): The profile_name
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current value
"""
import configparser
import os.path
import mysqlsh
# If passthrough_value is specified, returned it instead of the current
if passthrough_value:
return passthrough_value
# If no config is given, check the global one
if not profile_name:
if not config:
if 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
else:
config = get_config(profile_name=profile_name)
# Check if current value is already in the config, if so, return that
if config and value_name in config:
return config[value_name]
# If no cli_rc_file_path is given, first check the MYSQLSH_OCI_RC_FILE env_var and only then fall back to default
if cli_rc_file_path is None:
cli_rc_file_path = getenv("MYSQLSH_OCI_RC_FILE")
if cli_rc_file_path is None:
cli_rc_file_path = "~/.oci/oci_cli_rc"
# Convert Unix path to Windows
cli_rc_file_path = os.path.abspath(os.path.expanduser(cli_rc_file_path))
config = configparser.ConfigParser()
if os.path.exists(cli_rc_file_path):
config.read(cli_rc_file_path)
# Get the current profile_name
profile_name = get_current_profile(profile_name=profile_name)
if profile_name in config and value_name in config[profile_name] and \
config[profile_name][value_name] != "":
return config[profile_name][value_name]
else:
return None
def set_current_value(value_name, value, profile_name=None,
cli_rc_file_path=None):
"""Sets the current value for a given value_name
It is set in the global config and stored in the OCI CLI rc file
Args:
value_name (str): The name of the value to set, e.g. compartment-id
value (str): The value to set
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
None
"""
import configparser
import os.path
import pathlib
import mysqlsh
# Get the current profile_name
profile_name = get_current_profile(profile_name=profile_name)
# If no cli_rc_file_path is given, first check the MYSQLSH_OCI_RC_FILE env_var and only then fall back to default
if cli_rc_file_path is None:
cli_rc_file_path = getenv("MYSQLSH_OCI_RC_FILE")
if cli_rc_file_path is None:
cli_rc_file_path = "~/.oci/oci_cli_rc"
# Convert Unix path to Windows
cli_rc_file_path = os.path.abspath(os.path.expanduser(cli_rc_file_path))
# Load CLI config file
cli_config = configparser.ConfigParser()
if os.path.exists(cli_rc_file_path):
cli_config.read(cli_rc_file_path)
# Ensure that there is a section with the name of profile_name
if profile_name not in cli_config.sections():
cli_config[profile_name] = {}
# Set the db-system-id property that this plugin uses as current object
cli_config[profile_name][value_name] = value
# Ensure path exists
pathlib.Path(os.path.dirname(cli_rc_file_path)).mkdir(
parents=True, exist_ok=True)
with open(cli_rc_file_path, 'w') as configfile:
cli_config.write(configfile)
# Update the global config
if 'mds_config' in dir(mysqlsh.globals):
config = getattr(mysqlsh.globals, 'mds_config')
config[value_name] = value
@plugin_function('mds.set.currentCompartment', shell=True, cli=True, web=True)
def set_current_compartment(**kwargs):
"""Sets the current compartment
Args:
**kwargs: Optional parameters
Keyword Args:
compartment_path (str): The path of the compartment
compartment_id (str): The OCID of the compartment
config (dict): The config dict to use
config_profile (str): The name of the profile currently
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
interactive (bool): Whether information should be printed
raise_exceptions (bool): Whether exceptions should be raised
Returns:
None
"""
compartment_path = kwargs.get('compartment_path')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
config_profile = kwargs.get('config_profile')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
# Get the active config
try:
config = get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
from mds_plugin import compartment
if compartment_id is None and interactive:
if compartment_path is None:
print("Please specify the compartment to be used as current.\n")
try:
compartment_id = compartment.get_compartment_id(
compartment_path=compartment_path,
compartment_id=compartment_id,
config=config)
except Exception as e:
if raise_exceptions:
raise Exception(
f"ERROR: Could not get compartment_id. ({e})")
print(f"ERROR: Could not get compartment_id. ({e})")
return
if compartment_id is None:
print("Operation cancelled.")
return
# Store the new current value in the global config and in the conf file
set_current_value(
value_name="compartment-id", value=compartment_id,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current compartment was changed
if interactive:
if compartment_path is not None:
print(f"Current compartment changed to {compartment_path}\n")
else:
try:
path = compartment.get_compartment_full_path(
compartment_id=compartment_id,
config=config, interactive=False)
print(f"Current compartment changed to {path}\n")
except:
print(f"Current compartment changed to {compartment_id}\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentCompartmentId', shell=True, cli=True, web=True)
def get_current_compartment_id(**kwargs):
"""Gets the current compartment_id
Args:
**kwargs: Optional parameters
Keyword Args:
compartment_id (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): Name of the config profile
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current compartment_id
"""
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
return get_current_value(
value_name="compartment-id", passthrough_value=compartment_id,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentComputeInstance')
def set_current_instance(**kwargs):
"""Makes the given compute instance the current object
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the instance.
instance_id (str): The OCID of the instance
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
instance_name = kwargs.get('instance_name')
instance_id = kwargs.get('instance_id')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = get_current_config(config=config)
compartment_id = get_current_compartment_id(
compartment_id=compartment_id, config=config)
from mds_plugin import compute
# Get instance, but also allow setting of an empty instance_id to clean
# the current instance
instance = None
if instance_id != "":
instance = compute.get_instance(
instance_name=instance_name, instance_id=instance_id,
compartment_id=compartment_id, config=config,
ignore_current=True,
interactive=False)
instance_id = instance.id if instance else ""
# Store the new current value in the global config and the config file
set_current_value(
value_name="instance-id", value=instance_id,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
if interactive:
if instance:
# Print out that the current compartment was changed
print(f"Current compute instance changed to "
f"{instance.display_name}.\n")
else:
print(f"Current compute instance cleared.\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentComputeInstanceId')
def get_current_instance_id(instance_id=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current instance_id
Args:
instance_id (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
None
"""
return get_current_value(
value_name="instance-id", passthrough_value=instance_id,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentMysqlDbSystem')
def set_current_db_system(**kwargs):
"""Makes the given DB System the current object
Args:
**kwargs: Optional parameters
Keyword Args:
db_system_name (str): The name of the DB System.
db_system_id (str): The OCID of the DB System
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
db_system_name = kwargs.get('db_system_name')
db_system_id = kwargs.get('db_system_id')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = get_current_config(config=config)
compartment_id = get_current_compartment_id(
compartment_id=compartment_id, config=config)
from mds_plugin import mysql_database_service
# Get db_system, but also allow setting of an empty db_system_id to
# clean the current db_system
if db_system_id != '':
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config,
ignore_current=True, interactive=interactive,
return_python_object=True)
db_system_id = db_system.id if db_system is not None else ""
# Store the new current value in the global config and the config file
set_current_value(
value_name="db-system-id", value=db_system_id,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current DB System was changed
if interactive:
if db_system is not None:
print(f"Current MySQL DB System changed to "
f"{db_system.display_name}.\n")
else:
print(f"Current MySQL DB System cleared.\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentMysqlDbSystemId')
def get_current_db_system_id(db_system_id=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current db_system_id
Args:
db_system_id (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current DB System ID
"""
return get_current_value(
value_name="db-system-id", passthrough_value=db_system_id,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentBastion', shell=True, cli=True, web=True)
def set_current_bastion(**kwargs):
"""Makes the given Bastion the current object
Args:
**kwargs: Optional parameters
Keyword Args:
bastion_name (str): The name of the bastion
bastion_id (str): The OCID of the bastion
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
config_profile (str): The name of the profile currently
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
bastion_name = kwargs.get('bastion_name')
bastion_id = kwargs.get('bastion_id')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
config_profile = kwargs.get('config_profile')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get("interactive", core.get_interactive_default())
# Get the active config and compartment
try:
config = get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = get_current_compartment_id(
compartment_id=compartment_id, config=config)
from mds_plugin import bastion
# Get bastion, but also allow setting of an empty bastion_id to
# clean the current bastion
current_bastion = None
if bastion_id != '':
current_bastion = bastion.get_bastion(
bastion_name=bastion_name, bastion_id=bastion_id,
compartment_id=compartment_id, config=config,
ignore_current=True, interactive=interactive,
return_type=core.RETURN_OBJ)
bastion_id = current_bastion.id if current_bastion else ""
# Store the new current value in the global config and the config file
set_current_value(
value_name="bastion-id", value=bastion_id,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current bastion was changed
if interactive:
if current_bastion:
print(f"Current Bastion changed to "
f"{current_bastion.name}.\n")
else:
print(f"Current Bastion cleared.\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentBastionId')
def get_current_bastion_id(bastion_id=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current bastion_id
Args:
bastion_id (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): Name of the config profile
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The active bastion_id
"""
return get_current_value(
value_name="bastion-id", passthrough_value=bastion_id,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentBucket')
def set_current_bucket(**kwargs):
"""Makes the given DB System the current object
Args:
**kwargs: Optional parameters
Keyword Args:
bucket_name (str): The name of the bucket
compartment_id (str): OCID of the compartment
config (object): An OCI config object or None
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
bucket_name = kwargs.get('bucket_name')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = get_current_config(config=config)
compartment_id = get_current_compartment_id(
compartment_id=compartment_id, config=config)
from mds_plugin import object_store
# Get bucket
bucket = object_store.get_bucket(
bucket_name=bucket_name,
compartment_id=compartment_id, config=config,
ignore_current=True)
bucket_name = bucket.name if bucket is not None else ""
# Store the new current value in the global config and in the conf file
set_current_value(
value_name="bucket-name", value=bucket_name,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current DB System was changed
if interactive:
if bucket_name != "":
print(
f"Current object store bucket changed to {bucket_name}.\n")
else:
print(f"Current object store bucket cleared.\n")
except ValueError as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentBucket')
def get_current_bucket_name(bucket_name=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current bucket
Args:
bucket_name (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current bucket
"""
return get_current_value(
value_name="bucket-name", passthrough_value=bucket_name,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentNetwork')
def set_current_network(**kwargs):
"""Makes the given Network the current object
Args:
**kwargs: Optional parameters
Keyword Args:
network_name (str): The name of the network
compartment_id (str): OCID of the compartment
config (object): An OCI config object or None
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
network_name = kwargs.get('network_name')
compartment_id = kwargs.get('compartment_id')
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = get_current_config(config=config)
compartment_id = get_current_compartment_id(
compartment_id=compartment_id, config=config)
from mds_plugin import network
# Get bucket
network = network.get_network(
network_name=network_name,
compartment_id=compartment_id, config=config,
ignore_current=True)
network_id = network.id if network else ""
network_name = network.display_name if network and network.display_name \
else network_id
# Store the new current value in the global config and in the config file
set_current_value(
value_name="network-id", value=network_id,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current DB System was changed
if interactive:
if network_id != "":
print(f"Current network changed to {network_name}.\n")
else:
print(f"Current network cleared.\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentNetworkId')
def get_current_network_id(network_id=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current network id
Args:
network_id (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current network_id
"""
return get_current_value(
value_name="network-id", passthrough_value=network_id,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)
@plugin_function('mds.set.currentEndpoint')
def set_current_endpoint(endpoint='', **kwargs):
"""Makes the given url the current endpoint
Args:
endpoint (str): The URI of the endpoint
**kwargs: Optional parameters
Keyword Args:
config (object): An OCI config object or None
profile_name (str): The name of the profile currently in use
cli_rc_file_path (str): The location of the OCI CLI config file
raise_exceptions (bool): Whether exceptions should be raised
interactive (bool): Whether information should be printed
Returns:
None
"""
config = kwargs.get('config')
profile_name = kwargs.get('profile_name')
cli_rc_file_path = kwargs.get('cli_rc_file_path')
raise_exceptions = kwargs.get('raise_exceptions', False)
interactive = kwargs.get('interactive', True)
# Get the active config and compartment
try:
config = get_current_config(config=config)
# Store the new current value in the global config and in the config file
set_current_value(
value_name="endpoint", value=endpoint,
profile_name=profile_name, cli_rc_file_path=cli_rc_file_path)
# Print out that the current DB System was changed
if interactive:
if endpoint != "":
print(f"Current endpoint changed to {endpoint}.\n")
else:
print(f"Current endpoint cleared.\n")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.currentEndpoint')
def get_current_endpoint(endpoint=None, config=None, profile_name=None,
cli_rc_file_path=None):
"""Gets the current endpoint
Args:
endpoint (str): If specified, returned instead of the current
config (dict): The config to be used, None defaults to global config
profile_name (str): The profile_name is already defined
cli_rc_file_path (str): The location of the OCI CLI config file
Returns:
The current endpoint
"""
return get_current_value(
value_name="endpoint", passthrough_value=endpoint,
config=config, profile_name=profile_name,
cli_rc_file_path=cli_rc_file_path)