mds_plugin/bastion.py (833 lines of code) (raw):
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module to manage OCI Bastions"""
from code import interact
from mysqlsh.plugin_manager import plugin_function
from mds_plugin import core, configuration, compute, mysql_database_service
from mds_plugin import network
# cSpell:ignore vnics, vnic
def format_bastion_listing(items, current=None) -> str:
"""Formats a given list of objects in a human readable form
Args:
items: Either a list of objects or a single object
current (str): OCID of the current item
Returns:
The db_systems formatted as str
"""
# If a single db_system was given, wrap it in a list
if not type(items) is list:
items = [items]
# return objects in READABLE text output
out = ""
id = 1
for i in items:
index = f"*{id:>3} " if current == i.id else f"{id:>4} "
out += (index +
core.fixed_len(i.name, 24, ' ', True) +
core.fixed_len(i.lifecycle_state, 8, ' ') +
core.fixed_len(f"{i.time_created:%Y-%m-%d %H:%M}", 16, '\n'))
id += 1
return out
def format_session_listing(items, current=None) -> str:
"""Formats a given list of objects in a human readable form
Args:
items: Either a list of objects or a single object
current (str): OCID of the current item
Returns:
The db_systems formatted as str
"""
# If a single db_system was given, wrap it in a list
if not type(items) is list:
items = [items]
# return objects in READABLE text output
out = ""
id = 1
for i in items:
index = f"*{id:>3} " if current == i.id else f"{id:>4} "
s_type = i.target_resource_details.session_type
s_port = str(i.target_resource_details.target_resource_port)
s_ip = i.target_resource_details.target_resource_private_ip_address
s_time = f'{i.time_created:%Y%m%d-%H%M%S}'
out += (index +
core.fixed_len(i.display_name, 24, ' ', True) +
core.fixed_len(i.lifecycle_state, 8, ' ') +
core.fixed_len(s_type, 15, ' ') +
core.fixed_len(s_ip, 15, ' ') +
core.fixed_len(s_port, 15, ' ') +
core.fixed_len(s_time, 15, '\n'))
id += 1
# try:
# out += " " + i.ssh_metadata.get("command") + "\n"
# except:
# pass
return out
@plugin_function('mds.list.bastions', shell=True, cli=True, web=True)
def list_bastions(**kwargs):
"""Lists bastions
This function will list all bastions of the compartment with the
given compartment_id.
Args:
**kwargs: Optional parameters
Keyword Args:
compartment_id (str): OCID of the parent compartment
valid_for_db_system_id (str): OCID of the db_system_id the bastions
needs to be valid for and therefore are in the same subnet
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
Based on return_type
"""
compartment_id = kwargs.get("compartment_id")
valid_for_db_system_id = kwargs.get("valid_for_db_system_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get(
"return_type", # In interactive mode, default to formatted str return
core.RETURN_STR if interactive else core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
current_bastion_id = configuration.get_current_bastion_id(
config=config)
import oci.exceptions
try:
# Initialize the Object Store client
bastion_client = core.get_oci_bastion_client(config=config)
# List the bastions
bastions = bastion_client.list_bastions(
compartment_id=compartment_id).data
# Filter out all deleted bastions
bastions = [b for b in bastions if b.lifecycle_state != "DELETED"]
# Filter out all bastions that are not valid for the given DbSystem
if valid_for_db_system_id:
# Just consider active bastions here
bastions = [
b for b in bastions if b.lifecycle_state == "ACTIVE"]
valid_bastions = []
db_system = mysql_database_service.get_db_system(
db_system_id=valid_for_db_system_id,
config=config, interactive=False,
return_python_object=True)
for b in bastions:
bastion = bastion_client.get_bastion(bastion_id=b.id).data
if bastion.target_subnet_id == db_system.subnet_id:
valid_bastions.append(bastion)
bastions = valid_bastions
# Add the is_current field
current_bastion_id = configuration.get_current_bastion_id(
config=config)
for b in bastions:
# Add the is_current field to the object, also adding it to
# the swagger_types so oci.util.to_dict() does include it
setattr(b, "is_current", b.id == current_bastion_id)
b.swagger_types["is_current"] = "bool"
# bastions[i] = b
if len(bastions) < 1 and interactive:
print("This compartment contains no bastions.")
bastions = None
return core.oci_object(
oci_object=bastions,
return_type=return_type,
format_function=format_bastion_listing,
current=current_bastion_id)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.bastion', shell=True, cli=True, web=True)
def get_bastion(**kwargs):
"""Gets a Bastion with the given id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Optional parameters
Keyword Args:
bastion_name (str): The new name bastion
bastion_id (str): OCID of the bastion.
await_state (str): Await the given lifecycle state, ACTIVE, DELETED, ..
ignore_current (bool): Whether the current bastion should be ignored
fallback_to_any_in_compartment (bool): Whether to lookup bastion in
compartment
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None
"""
bastion_name = kwargs.get("bastion_name")
bastion_id = kwargs.get("bastion_id")
await_state = kwargs.get("await_state")
ignore_current = kwargs.get("ignore_current", False)
fallback_to_any_in_compartment = kwargs.get(
"fallback_to_any_in_compartment", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get("return_type", core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
if not ignore_current and bastion_name is None:
bastion_id = configuration.get_current_bastion_id(
bastion_id=bastion_id, config=config)
import oci.exceptions
try:
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
if not interactive and not bastion_id and not bastion_name:
raise ValueError('No bastion_id nor bastion_name given.')
bastion = None
if bastion_id:
bastion = bastion_client.get_bastion(
bastion_id=bastion_id).data
if not bastion:
raise ValueError('The bastion with the given OCID '
f'{bastion_id} was not found.')
if not bastion and (bastion_name or interactive):
# List the Bastion of the current compartment
bastions = bastion_client.list_bastions(
compartment_id=compartment_id).data
# Filter out all deleted compartments
bastions = [
u for u in bastions if u.lifecycle_state != "DELETED"]
if len(bastions) == 0:
if interactive:
print("No Bastions available in this compartment.")
return
# If a bastion name was given, look it up
if bastion_name:
for b in bastions:
if b.name == bastion_name:
bastion = bastion_client.get_bastion(
bastion_id=b.id).data
break
if bastion is None and not interactive:
raise ValueError(f"Bastion {bastion_name} was not "
"found in this compartment.")
# Fallback to first in compartment
if fallback_to_any_in_compartment:
bastion = bastion_client.get_bastion(
bastion_id=bastions[0].id).data
if not bastion and interactive:
# If the user_name was not given or found, print out the list
bastion_list = format_bastion_listing(items=bastions)
if bastion_name is None:
print(bastion_list)
# Let the user choose from the list
selected_bastion = core.prompt_for_list_item(
item_list=bastions,
prompt_caption=("Please enter the name or index "
"of the Bastion: "),
item_name_property="name", given_value=bastion_name)
if selected_bastion:
bastion = bastion_client.get_bastion(
bastion_id=selected_bastion.id).data
if bastion and await_state:
import time
if interactive:
print(f'Waiting for Bastion to reach '
f'{await_state} state...')
bastion_id = bastion.id
# Wait for the Bastion Session to reach state await_state
cycles = 0
while cycles < 48:
bastion = bastion_client.get_bastion(
bastion_id=bastion_id).data
if bastion.lifecycle_state == await_state:
break
else:
time.sleep(5)
s = "." * (cycles + 1)
if interactive:
print(f'Waiting for Bastion to reach '
f'{await_state} state...{s}')
cycles += 1
if bastion.lifecycle_state != await_state:
raise Exception("Bastion did not reach the state "
f"{await_state} within 4 minutes.")
return core.oci_object(
oci_object=bastion,
return_type=return_type,
format_function=format_bastion_listing)
except oci.exceptions.ServiceError as e:
if interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.create.bastion', shell=True, cli=True, web=True)
def create_bastion(**kwargs):
"""Creates a Bastion
Args:
**kwargs: Additional options
Keyword Args:
bastion_name (str): The new name of the compartment.
db_system_id (str): OCID of the DbSystem.
client_cidr (str): The client CIDR, defaults to "0.0.0.0/0"
max_session_ttl_in_seconds (int): The maximum amount of time that any
session on the bastion can remain active, defaults to 10800
target_subnet_id (str): The OCID of the subnet, defaults to the
subnet of the db_system if db_system_id is given
await_active_state (bool): Await the ACTIVE lifecycle state before
returning
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current DbSystem should be ignored
interactive (bool): Whether exceptions are raised
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
The id of the Bastion Session, None in interactive mode
"""
bastion_name = kwargs.get("bastion_name")
db_system_id = kwargs.get("db_system_id")
client_cidr = kwargs.get("client_cidr", "0.0.0.0/0")
max_session_ttl_in_seconds = kwargs.get(
"max_session_ttl_in_seconds", 10800)
target_subnet_id = kwargs.get("target_subnet_id")
await_active_state = kwargs.get("await_active_state", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
ignore_current = kwargs.get("ignore_current", False)
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get(
"return_type", # In interactive mode, default to formatted str return
core.RETURN_STR if interactive else core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
current_compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current:
db_system_id = configuration.get_current_db_system_id(
db_system_id=db_system_id, config=config)
import oci.bastion.models
import oci.mysql.models
import oci.exceptions
try:
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
db_system = None
if db_system_id:
db_system = mysql_database_service.get_db_system(
db_system_id=db_system_id,
config=config, interactive=False,
return_python_object=True)
if not db_system:
raise ValueError("No db_system found with the given id. "
"Operation cancelled.")
elif interactive:
for_db_system = core.prompt(
"Should the new Bastion be used to connect to "
"a MySQL DB System? [Y/n]: ",
options={'defaultValue': 'y'})
if not for_db_system:
raise ValueError("Operation cancelled.")
if for_db_system.lower() == 'y':
db_system = mysql_database_service.get_db_system(
compartment_id=current_compartment_id,
config=config, interactive=interactive,
return_python_object=True)
if not db_system:
raise ValueError("Operation cancelled.")
else:
raise ValueError("No db_system_id given. "
"Operation cancelled.")
# Check if the db_system already has a Bastion set in the
# freeform_tags
# if db_system and db_system.freeform_tags.get('bastion_id'):
# bastion = None
# # Check if that bastion still exists
# try:
# print("Check if that bastion still exists")
# bastion = get_bastion(
# bastion_id=db_system.freeform_tags.get('bastion_id'),
# return_type="OBJ",
# config=config, interactive=False)
# except ValueError:
# # If not, remove that old bastion id from the freeform_tags
# db_system.freeform_tags.pop('bastion_id', None)
# mysql_database_service.update_db_system(
# db_system_id=db_system.id,
# new_freeform_tags=db_system.freeform_tags,
# config=config, interactive=False)
# # If the assigned bastion does exist, error out
# if bastion and bastion.lifecycle_state == \
# oci.bastion.models.Bastion.LIFECYCLE_STATE_ACTIVE:
# raise ValueError(
# "The given MySQL DB System already has a Bastion "
# "assigned. Please remove 'bastion_id' from the "
# "freeform_tags to create a new Bastion for this "
# "DB System. Operation cancelled.")
# If a db_system was given, take the compartment_id from there
if not compartment_id and db_system:
compartment_id = db_system.compartment_id
elif not compartment_id:
compartment_id = current_compartment_id
if not bastion_name:
if db_system:
from datetime import datetime
bastion_name = (
"Bastion" +
datetime.now().strftime("%y%m%d%H%M"))
elif interactive:
bastion_name = core.prompt(
'Please enter a name for this new Bastion: ')
if not bastion_name:
raise ValueError("Operation cancelled.")
if not bastion_name:
raise ValueError("No bastion_name given. "
"Operation cancelled.")
if not target_subnet_id:
if db_system:
target_subnet_id = db_system.subnet_id
elif interactive:
# Get private subnet
subnet = network.get_subnet(
public_subnet=False,
compartment_id=compartment_id, config=config,
interactive=interactive)
if subnet is None:
print("Operation cancelled.")
return
target_subnet_id = subnet.id
else:
raise ValueError("No target_subnet_id given. "
"Operation cancelled.")
bastion_details = oci.bastion.models.CreateBastionDetails(
bastion_type="standard",
client_cidr_block_allow_list=[client_cidr],
compartment_id=compartment_id,
max_session_ttl_in_seconds=max_session_ttl_in_seconds,
name=bastion_name,
target_subnet_id=target_subnet_id
)
# Create the new bastion
new_bastion = bastion_client.create_bastion(
create_bastion_details=bastion_details).data
# Update the db_system freeform_tags to hold the assigned bastion
# if db_system:
# print("Update the db_system freeform_tags to hold the assigned bastion ")
# db_system.freeform_tags["bastion_id"] = new_bastion.id
# mysql_database_service.update_db_system(
# db_system_id=db_system.id,
# new_freeform_tags=db_system.freeform_tags,
# config=config, interactive=False)
if new_bastion and await_active_state:
import time
if interactive:
print(f'Waiting for Bastion to reach '
f'ACTIVE state...')
bastion_id = new_bastion.id
# Wait for the Bastion Session to reach state await_state
cycles = 0
while cycles < 60:
bastion = bastion_client.get_bastion(
bastion_id=bastion_id).data
if bastion.lifecycle_state == "ACTIVE":
break
else:
time.sleep(5)
s = "." * (cycles + 1)
if interactive:
print(f'Waiting for Bastion to reach '
f'ACTIVE state...{s}')
cycles += 1
if bastion.lifecycle_state != "ACTIVE":
raise Exception("Bastion did not reach the state "
f"ACTIVE within 5 minutes.")
return core.oci_object(
oci_object=bastion,
return_type=return_type,
format_function=lambda b: print(
f"Bastion {b.name} has been created."))
else:
return core.oci_object(
oci_object=new_bastion,
return_type=return_type,
format_function=lambda b: print(
f"Bastion {b.name} is being "
f"created. Use mds.list.bastions() to check "
"it's provisioning state.\n"))
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.delete.bastion', shell=True, cli=True, web=True)
def delete_bastion(**kwargs):
"""Deletes a Bastion with the given id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Optional parameters
Keyword Args:
bastion_name (str): The new name bastion
bastion_id (str): OCID of the bastion.
await_deletion (bool): Whether to wait till the bastion reaches
lifecycle state DELETED
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current bastion should be ignored
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None
"""
bastion_name = kwargs.get("bastion_name")
bastion_id = kwargs.get("bastion_id")
await_deletion = kwargs.get("await_deletion")
compartment_id = kwargs.get("compartment_id")
ignore_current = kwargs.get("ignore_current", True)
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
import mysqlsh
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current and bastion_name is None:
bastion_id = configuration.get_current_bastion_id(
bastion_id=bastion_id, config=config)
import oci.identity
import oci.mysql
try:
bastion = get_bastion(
bastion_name=bastion_name,
bastion_id=bastion_id, compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ)
if not bastion:
raise ValueError(
"No bastion given or found. "
"Operation cancelled.")
bastion_id = bastion.id
if interactive:
# Prompt the user for specifying a compartment
prompt = mysqlsh.globals.shell.prompt(
f"Are you sure you want to delete the Bastion "
f"{bastion.name} [yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
raise Exception("Deletion aborted.\n")
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
bastion_client.delete_bastion(bastion_id=bastion_id)
# Get db_system client
db_system_client = core.get_oci_db_system_client(config=config)
# Update db_systems in the compartment and remove the bastion_id
# db_systems = mysql_database_service.list_db_systems(
# compartment_id=bastion.compartment_id,
# config=config, interactive=False)
# for db_system in db_systems:
# if "bastion_id" in db_system.get("freeform_tags"):
# if db_system.get("freeform_tags").get("bastion_id") == \
# bastion_id:
# # Remove the "bastion_id" key from the dict
# db_system.get("freeform_tags").pop("bastion_id")
# # Update the db_system
# update_details = oci.mysql.models.UpdateDbSystemDetails(
# freeform_tags=db_system.get("freeform_tags"))
# db_system_client.update_db_system(
# db_system.get("id"), update_details)
# If the current bastion has been deleted, clear it
if configuration.get_current_bastion_id(
config=config) == bastion_id:
configuration.set_current_bastion(bastion_id='')
# If the function should wait till the bastion reaches the DELETED
# lifecycle state
if await_deletion:
import time
if interactive:
print('Waiting for Bastion to be deleted...')
# Wait for the Bastion Session to be ACTIVE
cycles = 0
while cycles < 48:
bastion = bastion_client.get_bastion(
bastion_id=bastion_id).data
if bastion.lifecycle_state == "DELETED":
break
else:
time.sleep(5)
s = "." * (cycles + 1)
if interactive:
print(f'Waiting for Bastion to be deleted...{s}')
cycles += 1
if bastion.lifecycle_state != "DELETED":
raise Exception("Bastion did not reach the DELETED "
"state within 4 minutes.")
if interactive:
print(
f"Bastion '{bastion.name}' was deleted successfully.")
elif interactive:
print(f"Bastion '{bastion.name}' is being deleted.")
except oci.exceptions.ServiceError as e:
if interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except (Exception) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.list.bastionSessions', shell=True, cli=True, web=True)
def list_sessions(**kwargs):
"""Lists bastion sessions
Args:
**kwargs: Optional parameters
Keyword Args:
bastion_id (str): OCID of the bastion.
ignore_current (bool): Whether to not default to the current bastion.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Whether output is more descriptive
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
A list of dicts representing the bastions
"""
bastion_id = kwargs.get("bastion_id")
ignore_current = kwargs.get("ignore_current", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get(
"return_type", # In interactive mode, default to formatted str return
core.RETURN_STR if interactive else core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
current_bastion_id = configuration.get_current_bastion_id(
config=config)
import oci.exceptions
try:
# Initialize the Object Store client
bastion_client = core.get_oci_bastion_client(config=config)
if not bastion_id and not ignore_current:
bastion_id = current_bastion_id
bastion = get_bastion(
bastion_id=bastion_id,
compartment_id=compartment_id, config=config,
ignore_current=ignore_current, interactive=interactive,
return_type=core.RETURN_OBJ)
sessions = bastion_client.list_sessions(bastion_id=bastion.id).data
# Filter out all deleted sessions
# bastions = [s for s in sessions if s.lifecycle_state != "DELETED"]
ext_sessions = []
for s in sessions:
if s.lifecycle_state != "DELETED":
ext_sessions.append(
bastion_client.get_session(session_id=s.id).data)
else:
ext_sessions.append(s)
if len(ext_sessions) < 1 and interactive:
print("This bastion contains no sessions.")
return core.oci_object(
oci_object=ext_sessions,
return_type=return_type,
format_function=format_session_listing)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.bastionSession', shell=True, cli=True, web=True)
def get_session(**kwargs):
"""Gets a Bastion Session with the given id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Optional parameters
Keyword Args:
session_name (str): The name of the session.
session_id (str): OCID of the session.
bastion_id (str): OCID of the bastion.
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current bastion should be ignored
interactive (bool): Whether exceptions are raised
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None
"""
session_name = kwargs.get("session_name")
session_id = kwargs.get("session_id")
bastion_id = kwargs.get("bastion_id")
compartment_id = kwargs.get("compartment_id")
ignore_current = kwargs.get("ignore_current", False)
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get("return_type", core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.exceptions
try:
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
if session_id:
return bastion_client.get_session(session_id=session_id).data
if not bastion_id and interactive:
bastion_id = get_bastion(
compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ).id
if not bastion_id:
raise Exception("No bastion_id given. "
"Cancelling operation.")
# List the Bastion of the current compartment
sessions = bastion_client.list_sessions(bastion_id=bastion_id).data
if len(sessions) == 0:
print("No Bastions available in this compartment.")
return
# If the user_name was not given or found, print out the list
sessions_list = format_session_listing(sessions)
if session_name is None:
print(sessions_list)
# Let the user choose from the list
selected_session = core.prompt_for_list_item(
item_list=sessions,
prompt_caption=("Please enter the name or index "
"of the Bastion Session: "),
item_name_property="display_name", given_value=session_name)
session = bastion_client.get_session(
session_id=selected_session.id).data
return core.oci_object(
oci_object=session,
return_type=return_type,
format_function=format_session_listing)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except (ValueError, oci.exceptions.ClientError) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.create.bastionSession', shell=True, cli=True, web=True)
def create_session(**kwargs):
"""Creates a Bastion Session for the given bastion_id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Additional options
Keyword Args:
bastion_name (str): The new name of the compartment.
bastion_id (str): OCID of the Bastion.
fallback_to_any_in_compartment (bool): Fallback to first bastion in
the given compartment
session_type (str): The type of the session, either MANAGED_SSH or
PORT_FORWARDING
session_name (str): The name of the session.
target_id (str): OCID of the session target.
target_ip (str): The TCP/IP address of the session target.
target_port (str): The TCP/IP Port of the session target.
target_user (str): The user account on the session target.
ttl_in_seconds (int): Time to live for the session, max 10800.
public_key_file (str): The filename of a public SSH key.
public_key_content (str): The public SSH key.
await_creation (bool): Whether to wait till the bastion reaches
lifecycle state ACTIVE
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current Bastion should be ignored
interactive (bool): Whether exceptions are raised
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
The id of the Bastion Session, None in interactive mode
"""
bastion_name = kwargs.get("bastion_name")
bastion_id = kwargs.get("bastion_id")
session_type = kwargs.get("session_type")
session_name = kwargs.get("session_name")
target_id = kwargs.get("target_id")
target_ip = kwargs.get("target_ip")
target_port = kwargs.get("target_port")
target_user = kwargs.get("target_user")
ttl_in_seconds = kwargs.get("ttl_in_seconds", 10800)
public_key_file = kwargs.get("public_key_file")
public_key_content = kwargs.get("public_key_content")
await_creation = kwargs.get("await_creation")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
ignore_current = kwargs.get("ignore_current", False)
fallback_to_any_in_compartment = kwargs.get(
"fallback_to_any_in_compartment", False)
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get(
"return_type", # In interactive mode, default to formatted str return
core.RETURN_STR if interactive else core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
from datetime import datetime
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config,
config_profile=config_profile, interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current and not bastion_name:
bastion_id = configuration.get_current_bastion_id(
bastion_id=bastion_id, config=config)
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
if not bastion_id and (bastion_name or interactive):
bastion_id = get_bastion(
bastion_name=bastion_name,
fallback_to_any_in_compartment=fallback_to_any_in_compartment,
compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ).id
if not bastion_id:
raise ValueError("No bastion_id or bastion_name specified.")
if session_type == "MANAGED_SSH":
default_name = f'COMPUTE-{datetime.now():%Y%m%d-%H%M%S}'
else:
default_name = f'MDS-{datetime.now():%Y%m%d-%H%M%S}'
if not session_name and interactive:
session_name = core.prompt(
"Please specify a name for the Bastion Session "
f"({default_name}): ",
{'defaultValue': default_name}).strip()
if not session_type and interactive:
session_type = core.prompt_for_list_item(
item_list=["MANAGED_SSH", "PORT_FORWARDING"],
prompt_caption=(
"Please select the Bastion Session type "
"(PORT_FORWARDING): "),
print_list=True,
prompt_default_value="PORT_FORWARDING")
if not session_type:
raise ValueError("No session_type given. "
"Operation cancelled.")
if session_type == "MANAGED_SSH":
if not target_id and interactive:
instance = compute.get_instance(
compartment_id=compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if not instance:
raise Exception("No target_id specified."
"Cancelling operation")
target_id = instance.id
if target_id and not target_ip:
vnics = compute.list_vnics(
instance_id=target_id,
compartment_id=compartment_id, config=config,
interactive=False,
return_python_object=True)
for vnic in vnics:
if vnic.private_ip:
target_ip = vnic.private_ip
break
if not target_ip:
raise ValueError(
'No private IP found for this compute instance.')
if not target_port:
target_port = 22
if not target_user:
target_user = "opc"
else:
if not target_ip and interactive:
db_system = mysql_database_service.get_db_system(
compartment_id=compartment_id, config=config,
return_python_object=True)
if not db_system:
raise Exception("No target_id specified."
"Cancelling operation")
endpoint = db_system.endpoints[0]
target_ip = endpoint.ip_address
if not target_port:
target_port = endpoint.port
if not target_ip:
raise Exception("No target_ip specified."
"Cancelling operation")
if not target_port:
raise Exception("No target_port specified."
"Cancelling operation")
if session_type == "MANAGED_SSH":
if not target_id:
raise Exception("No target_id specified."
"Cancelling operation")
if not target_user:
raise Exception("No target_user specified."
"Cancelling operation")
# Convert Unix path to Windows
import os.path
ssh_key_location = os.path.abspath(os.path.expanduser("~/.ssh"))
if not public_key_file and not public_key_content and interactive:
from os import listdir
files = [f for f in listdir(ssh_key_location)
if os.path.isfile(os.path.join(ssh_key_location, f)) and
f.lower().endswith(".pub")]
public_key_file = core.prompt_for_list_item(
item_list=files, print_list=True,
prompt_caption="Please select a public SSH Key: ")
if not public_key_file:
raise Exception("No public SSH Key specified."
"Cancelling operation")
default_key_file = os.path.join(ssh_key_location, "id_rsa_oci_tunnel.pub")
if not public_key_file and os.path.exists(default_key_file):
public_key_file = default_key_file
if public_key_file and not public_key_content:
with open(os.path.join(
ssh_key_location, public_key_file), 'r') as file:
public_key_content = file.read()
if not public_key_content:
raise Exception("No public SSH Key specified. "
"Cancelling operation")
import oci.identity
import oci.bastion
import hashlib
try:
if session_type == "MANAGED_SSH":
target_details = oci.bastion.models.\
CreateManagedSshSessionTargetResourceDetails(
target_resource_id=target_id,
target_resource_port=target_port,
target_resource_private_ip_address=target_ip,
target_resource_operating_system_user_name=target_user)
else:
target_details = oci.bastion.models.\
CreatePortForwardingSessionTargetResourceDetails(
target_resource_id=target_id,
target_resource_port=target_port,
target_resource_private_ip_address=target_ip)
if not session_name:
# Calculate unique fingerprint based on all params
params = (
target_id + bastion_id +
config_profile + public_key_content +
f'{target_ip}:{target_port}')
fingerprint = hashlib.md5(params.encode('utf-8')).hexdigest()
session_name = f'MDS-{fingerprint}'
# Check if a session with this fingerprinted name already exists
sessions = bastion_client.list_sessions(
bastion_id=bastion_id,
display_name=session_name,
session_lifecycle_state='ACTIVE').data
if len(sessions) == 1:
# If so, return that session
return core.oci_object(
oci_object=sessions[0],
return_type=return_type,
format_function=format_session_listing)
if session_type == "MANAGED_SSH":
# Check if Bastion Plugin is already enabled
ia_client = core.get_oci_instance_agent_client(config)
# cSpell:ignore instanceagent
bastion_plugin = ia_client.get_instance_agent_plugin(
instanceagent_id=target_id,
compartment_id=compartment_id,
plugin_name="Bastion").data
if bastion_plugin.status != "RUNNING":
# TODO: use UpdateInstanceDetails to enabled the bastion
# service
raise Exception(
'Please enabled the Bastion plugin on '
'this instance.')
session_details = oci.bastion.models.CreateSessionDetails(
bastion_id=bastion_id,
display_name=session_name,
key_details=oci.bastion.models.PublicKeyDetails(
public_key_content=public_key_content),
key_type="PUB",
session_ttl_in_seconds=ttl_in_seconds,
target_resource_details=target_details
)
new_session = bastion_client.create_session(session_details).data
if await_creation:
import time
# Wait for the Bastion Session to be ACTIVE
cycles = 0
while cycles < 24:
bastion_session = bastion_client.get_session(
session_id=new_session.id).data
if bastion_session.lifecycle_state == "ACTIVE":
# TODO: Report bug to the Bastion dev team.
# Ask them to only switch lifecycle_state to ACTIVE
# if the Bastion Session can actually accept connections
time.sleep(5)
break
else:
time.sleep(5)
s = "." * (cycles + 1)
if interactive:
print(f'Waiting for Bastion Session to become '
f'active...{s}')
cycles += 1
bastion_session = bastion_client.get_session(
session_id=new_session.id).data
if bastion_session.lifecycle_state != "ACTIVE":
raise Exception("Bastion Session did not reach ACTIVE "
"state within 2 minutes.")
return core.oci_object(
oci_object=new_session,
return_type=return_type,
format_function=lambda s: print(
f"Bastion Session {s.display_name} is being\n"
f" created. Use mds.list.bastionSessions() to check "
"it's provisioning state.\n"))
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.delete.bastionSession', shell=True, cli=True, web=True)
def delete_session(**kwargs):
"""Deletes a Bastion Session with the given id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Optional parameters
Keyword Args:
session_name (str): The name of the bastion session
session_id (str): The id of the bastion session
bastion_name (str): The name of the bastion.
bastion_id (str): OCID of the bastion.
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current bastion should be ignored
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None
"""
session_name = kwargs.get("session_name")
session_id = kwargs.get("session_id")
bastion_name = kwargs.get("bastion_name")
bastion_id = kwargs.get("bastion_id")
compartment_id = kwargs.get("compartment_id")
ignore_current = kwargs.get("ignore_current", False)
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
import mysqlsh
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current and bastion_name is None:
bastion_id = configuration.get_current_bastion_id(
bastion_id=bastion_id, config=config)
import oci.exceptions
try:
if not session_id:
bastion = get_bastion(
bastion_name=bastion_name,
bastion_id=bastion_id, compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ)
if not bastion:
raise ValueError(
"No bastion given or found. "
"Operation cancelled.")
session = get_session(
session_name=session_name, session_id=session_id,
bastion_id=bastion.id, compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ)
if not session:
raise ValueError(
"No bastion session given or found. "
"Operation cancelled.")
session_id = session.id
if interactive:
# Prompt the user for specifying a compartment
prompt = mysqlsh.globals.shell.prompt(
f"Are you sure you want to delete the bastion session? "
f"[yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
raise Exception("Deletion aborted.\n")
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
bastion_client.delete_session(session_id=session_id)
if interactive:
print(f"The bastion session is being deleted.")
except oci.exceptions.ServiceError as e:
if interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except (Exception) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')