in mds_plugin/bastion.py [0:0]
def create_session(**kwargs):
"""Creates a Bastion Session for the given bastion_id
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Additional options
Keyword Args:
bastion_name (str): The new name of the compartment.
bastion_id (str): OCID of the Bastion.
fallback_to_any_in_compartment (bool): Fallback to first bastion in
the given compartment
session_type (str): The type of the session, either MANAGED_SSH or
PORT_FORWARDING
session_name (str): The name of the session.
target_id (str): OCID of the session target.
target_ip (str): The TCP/IP address of the session target.
target_port (str): The TCP/IP Port of the session target.
target_user (str): The user account on the session target.
ttl_in_seconds (int): Time to live for the session, max 10800.
public_key_file (str): The filename of a public SSH key.
public_key_content (str): The public SSH key.
await_creation (bool): Whether to wait till the bastion reaches
lifecycle state ACTIVE
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current Bastion should be ignored
interactive (bool): Whether exceptions are raised
return_type (str): "STR" will return a formatted string, "DICT" will
return the object converted to a dict structure and "OBJ" will
return the OCI Python object for internal plugin usage
raise_exceptions (bool): If set to true exceptions are raised
Returns:
The id of the Bastion Session, None in interactive mode
"""
bastion_name = kwargs.get("bastion_name")
bastion_id = kwargs.get("bastion_id")
session_type = kwargs.get("session_type")
session_name = kwargs.get("session_name")
target_id = kwargs.get("target_id")
target_ip = kwargs.get("target_ip")
target_port = kwargs.get("target_port")
target_user = kwargs.get("target_user")
ttl_in_seconds = kwargs.get("ttl_in_seconds", 10800)
public_key_file = kwargs.get("public_key_file")
public_key_content = kwargs.get("public_key_content")
await_creation = kwargs.get("await_creation")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
ignore_current = kwargs.get("ignore_current", False)
fallback_to_any_in_compartment = kwargs.get(
"fallback_to_any_in_compartment", False)
interactive = kwargs.get("interactive", core.get_interactive_default())
return_type = kwargs.get(
"return_type", # In interactive mode, default to formatted str return
core.RETURN_STR if interactive else core.RETURN_DICT)
raise_exceptions = kwargs.get(
"raise_exceptions", # On internal call (RETURN_OBJ), raise exceptions
True if return_type == core.RETURN_OBJ else not interactive)
from datetime import datetime
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config,
config_profile=config_profile, interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current and not bastion_name:
bastion_id = configuration.get_current_bastion_id(
bastion_id=bastion_id, config=config)
# Initialize the Bastion client
bastion_client = core.get_oci_bastion_client(config=config)
if not bastion_id and (bastion_name or interactive):
bastion_id = get_bastion(
bastion_name=bastion_name,
fallback_to_any_in_compartment=fallback_to_any_in_compartment,
compartment_id=compartment_id,
config=config, ignore_current=ignore_current,
interactive=interactive, return_type=core.RETURN_OBJ).id
if not bastion_id:
raise ValueError("No bastion_id or bastion_name specified.")
if session_type == "MANAGED_SSH":
default_name = f'COMPUTE-{datetime.now():%Y%m%d-%H%M%S}'
else:
default_name = f'MDS-{datetime.now():%Y%m%d-%H%M%S}'
if not session_name and interactive:
session_name = core.prompt(
"Please specify a name for the Bastion Session "
f"({default_name}): ",
{'defaultValue': default_name}).strip()
if not session_type and interactive:
session_type = core.prompt_for_list_item(
item_list=["MANAGED_SSH", "PORT_FORWARDING"],
prompt_caption=(
"Please select the Bastion Session type "
"(PORT_FORWARDING): "),
print_list=True,
prompt_default_value="PORT_FORWARDING")
if not session_type:
raise ValueError("No session_type given. "
"Operation cancelled.")
if session_type == "MANAGED_SSH":
if not target_id and interactive:
instance = compute.get_instance(
compartment_id=compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if not instance:
raise Exception("No target_id specified."
"Cancelling operation")
target_id = instance.id
if target_id and not target_ip:
vnics = compute.list_vnics(
instance_id=target_id,
compartment_id=compartment_id, config=config,
interactive=False,
return_python_object=True)
for vnic in vnics:
if vnic.private_ip:
target_ip = vnic.private_ip
break
if not target_ip:
raise ValueError(
'No private IP found for this compute instance.')
if not target_port:
target_port = 22
if not target_user:
target_user = "opc"
else:
if not target_ip and interactive:
db_system = mysql_database_service.get_db_system(
compartment_id=compartment_id, config=config,
return_python_object=True)
if not db_system:
raise Exception("No target_id specified."
"Cancelling operation")
endpoint = db_system.endpoints[0]
target_ip = endpoint.ip_address
if not target_port:
target_port = endpoint.port
if not target_ip:
raise Exception("No target_ip specified."
"Cancelling operation")
if not target_port:
raise Exception("No target_port specified."
"Cancelling operation")
if session_type == "MANAGED_SSH":
if not target_id:
raise Exception("No target_id specified."
"Cancelling operation")
if not target_user:
raise Exception("No target_user specified."
"Cancelling operation")
# Convert Unix path to Windows
import os.path
ssh_key_location = os.path.abspath(os.path.expanduser("~/.ssh"))
if not public_key_file and not public_key_content and interactive:
from os import listdir
files = [f for f in listdir(ssh_key_location)
if os.path.isfile(os.path.join(ssh_key_location, f)) and
f.lower().endswith(".pub")]
public_key_file = core.prompt_for_list_item(
item_list=files, print_list=True,
prompt_caption="Please select a public SSH Key: ")
if not public_key_file:
raise Exception("No public SSH Key specified."
"Cancelling operation")
default_key_file = os.path.join(ssh_key_location, "id_rsa_oci_tunnel.pub")
if not public_key_file and os.path.exists(default_key_file):
public_key_file = default_key_file
if public_key_file and not public_key_content:
with open(os.path.join(
ssh_key_location, public_key_file), 'r') as file:
public_key_content = file.read()
if not public_key_content:
raise Exception("No public SSH Key specified. "
"Cancelling operation")
import oci.identity
import oci.bastion
import hashlib
try:
if session_type == "MANAGED_SSH":
target_details = oci.bastion.models.\
CreateManagedSshSessionTargetResourceDetails(
target_resource_id=target_id,
target_resource_port=target_port,
target_resource_private_ip_address=target_ip,
target_resource_operating_system_user_name=target_user)
else:
target_details = oci.bastion.models.\
CreatePortForwardingSessionTargetResourceDetails(
target_resource_id=target_id,
target_resource_port=target_port,
target_resource_private_ip_address=target_ip)
if not session_name:
# Calculate unique fingerprint based on all params
params = (
target_id + bastion_id +
config_profile + public_key_content +
f'{target_ip}:{target_port}')
fingerprint = hashlib.md5(params.encode('utf-8')).hexdigest()
session_name = f'MDS-{fingerprint}'
# Check if a session with this fingerprinted name already exists
sessions = bastion_client.list_sessions(
bastion_id=bastion_id,
display_name=session_name,
session_lifecycle_state='ACTIVE').data
if len(sessions) == 1:
# If so, return that session
return core.oci_object(
oci_object=sessions[0],
return_type=return_type,
format_function=format_session_listing)
if session_type == "MANAGED_SSH":
# Check if Bastion Plugin is already enabled
ia_client = core.get_oci_instance_agent_client(config)
# cSpell:ignore instanceagent
bastion_plugin = ia_client.get_instance_agent_plugin(
instanceagent_id=target_id,
compartment_id=compartment_id,
plugin_name="Bastion").data
if bastion_plugin.status != "RUNNING":
# TODO: use UpdateInstanceDetails to enabled the bastion
# service
raise Exception(
'Please enabled the Bastion plugin on '
'this instance.')
session_details = oci.bastion.models.CreateSessionDetails(
bastion_id=bastion_id,
display_name=session_name,
key_details=oci.bastion.models.PublicKeyDetails(
public_key_content=public_key_content),
key_type="PUB",
session_ttl_in_seconds=ttl_in_seconds,
target_resource_details=target_details
)
new_session = bastion_client.create_session(session_details).data
if await_creation:
import time
# Wait for the Bastion Session to be ACTIVE
cycles = 0
while cycles < 24:
bastion_session = bastion_client.get_session(
session_id=new_session.id).data
if bastion_session.lifecycle_state == "ACTIVE":
# TODO: Report bug to the Bastion dev team.
# Ask them to only switch lifecycle_state to ACTIVE
# if the Bastion Session can actually accept connections
time.sleep(5)
break
else:
time.sleep(5)
s = "." * (cycles + 1)
if interactive:
print(f'Waiting for Bastion Session to become '
f'active...{s}')
cycles += 1
bastion_session = bastion_client.get_session(
session_id=new_session.id).data
if bastion_session.lifecycle_state != "ACTIVE":
raise Exception("Bastion Session did not reach ACTIVE "
"state within 2 minutes.")
return core.oci_object(
oci_object=new_session,
return_type=return_type,
format_function=lambda s: print(
f"Bastion Session {s.display_name} is being\n"
f" created. Use mds.list.bastionSessions() to check "
"it's provisioning state.\n"))
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')