def create_session()

in mds_plugin/bastion.py [0:0]


def create_session(**kwargs):
    """Creates a Bastion Session for the given bastion_id

    If no id is given, it will prompt the user for the id.

    Args:
        **kwargs: Additional options

    Keyword Args:
        bastion_name (str): The new name of the compartment.
        bastion_id (str): OCID of the Bastion.
        fallback_to_any_in_compartment (bool): Fallback to first bastion in
            the given compartment
        session_type (str): The type of the session, either MANAGED_SSH or
            PORT_FORWARDING
        session_name (str): The name of the session.
        target_id (str): OCID of the session target.
        target_ip (str): The TCP/IP address of the session target.
        target_port (str): The TCP/IP Port of the session target.
        target_user (str): The user account on the session target.
        ttl_in_seconds (int): Time to live for the session, max 10800.
        public_key_file (str): The filename of a public SSH key.
        public_key_content (str): The public SSH key.
        await_creation (bool): Whether to wait till the bastion reaches
            lifecycle state ACTIVE
        compartment_id (str): OCID of the compartment.
        config (dict): An OCI config object or None.
        config_profile (str): The name of an OCI config profile
        ignore_current (bool): Whether the current Bastion should be ignored
        interactive (bool): Whether exceptions are raised
        return_type (str): "STR" will return a formatted string, "DICT" will
            return the object converted to a dict structure and "OBJ" will
            return the OCI Python object for internal plugin usage
        raise_exceptions (bool): If set to true exceptions are raised

    Returns:
       The id of the Bastion Session, None in interactive mode
    """

    bastion_name = kwargs.get("bastion_name")
    bastion_id = kwargs.get("bastion_id")
    session_type = kwargs.get("session_type")
    session_name = kwargs.get("session_name")
    target_id = kwargs.get("target_id")
    target_ip = kwargs.get("target_ip")
    target_port = kwargs.get("target_port")
    target_user = kwargs.get("target_user")
    ttl_in_seconds = kwargs.get("ttl_in_seconds", 10800)
    public_key_file = kwargs.get("public_key_file")
    public_key_content = kwargs.get("public_key_content")

    await_creation = kwargs.get("await_creation")

    compartment_id = kwargs.get("compartment_id")
    config = kwargs.get("config")
    config_profile = kwargs.get("config_profile")
    ignore_current = kwargs.get("ignore_current", False)
    fallback_to_any_in_compartment = kwargs.get(
        "fallback_to_any_in_compartment", False)

    interactive = kwargs.get("interactive", core.get_interactive_default())
    return_type = kwargs.get(
        "return_type",  # In interactive mode, default to formatted str return
        core.RETURN_STR if interactive else core.RETURN_DICT)
    raise_exceptions = kwargs.get(
        "raise_exceptions",  # On internal call (RETURN_OBJ), raise exceptions
        True if return_type == core.RETURN_OBJ else not interactive)

    from datetime import datetime

    # Get the active config and compartment
    try:
        config = configuration.get_current_config(
            config=config,
            config_profile=config_profile, interactive=interactive)
        compartment_id = configuration.get_current_compartment_id(
            compartment_id=compartment_id, config=config)
        if not ignore_current and not bastion_name:
            bastion_id = configuration.get_current_bastion_id(
                bastion_id=bastion_id, config=config)

        # Initialize the Bastion client
        bastion_client = core.get_oci_bastion_client(config=config)

        if not bastion_id and (bastion_name or interactive):
            bastion_id = get_bastion(
                bastion_name=bastion_name,
                fallback_to_any_in_compartment=fallback_to_any_in_compartment,
                compartment_id=compartment_id,
                config=config, ignore_current=ignore_current,
                interactive=interactive, return_type=core.RETURN_OBJ).id

        if not bastion_id:
            raise ValueError("No bastion_id or bastion_name specified.")

        if session_type == "MANAGED_SSH":
            default_name = f'COMPUTE-{datetime.now():%Y%m%d-%H%M%S}'
        else:
            default_name = f'MDS-{datetime.now():%Y%m%d-%H%M%S}'

        if not session_name and interactive:
            session_name = core.prompt(
                "Please specify a name for the Bastion Session "
                f"({default_name}): ",
                {'defaultValue': default_name}).strip()

        if not session_type and interactive:
            session_type = core.prompt_for_list_item(
                item_list=["MANAGED_SSH", "PORT_FORWARDING"],
                prompt_caption=(
                    "Please select the Bastion Session type "
                    "(PORT_FORWARDING): "),
                print_list=True,
                prompt_default_value="PORT_FORWARDING")

        if not session_type:
            raise ValueError("No session_type given. "
                             "Operation cancelled.")

        if session_type == "MANAGED_SSH":
            if not target_id and interactive:
                instance = compute.get_instance(
                    compartment_id=compartment_id, config=config,
                    interactive=interactive,
                    return_python_object=True)
                if not instance:
                    raise Exception("No target_id specified."
                                    "Cancelling operation")

                target_id = instance.id

            if target_id and not target_ip:
                vnics = compute.list_vnics(
                    instance_id=target_id,
                    compartment_id=compartment_id, config=config,
                    interactive=False,
                    return_python_object=True)
                for vnic in vnics:
                    if vnic.private_ip:
                        target_ip = vnic.private_ip
                        break
                if not target_ip:
                    raise ValueError(
                        'No private IP found for this compute instance.')

            if not target_port:
                target_port = 22

            if not target_user:
                target_user = "opc"
        else:
            if not target_ip and interactive:
                db_system = mysql_database_service.get_db_system(
                    compartment_id=compartment_id, config=config,
                    return_python_object=True)
                if not db_system:
                    raise Exception("No target_id specified."
                                    "Cancelling operation")

                endpoint = db_system.endpoints[0]
                target_ip = endpoint.ip_address
                if not target_port:
                    target_port = endpoint.port

        if not target_ip:
            raise Exception("No target_ip specified."
                            "Cancelling operation")

        if not target_port:
            raise Exception("No target_port specified."
                            "Cancelling operation")

        if session_type == "MANAGED_SSH":
            if not target_id:
                raise Exception("No target_id specified."
                                "Cancelling operation")
            if not target_user:
                raise Exception("No target_user specified."
                                "Cancelling operation")

        # Convert Unix path to Windows
        import os.path
        ssh_key_location = os.path.abspath(os.path.expanduser("~/.ssh"))

        if not public_key_file and not public_key_content and interactive:
            from os import listdir

            files = [f for f in listdir(ssh_key_location)
                     if os.path.isfile(os.path.join(ssh_key_location, f)) and
                     f.lower().endswith(".pub")]

            public_key_file = core.prompt_for_list_item(
                item_list=files, print_list=True,
                prompt_caption="Please select a public SSH Key: ")
            if not public_key_file:
                raise Exception("No public SSH Key specified."
                                "Cancelling operation")

        default_key_file = os.path.join(ssh_key_location, "id_rsa_oci_tunnel.pub")
        if not public_key_file and os.path.exists(default_key_file):
            public_key_file = default_key_file

        if public_key_file and not public_key_content:
            with open(os.path.join(
                    ssh_key_location, public_key_file), 'r') as file:
                public_key_content = file.read()

        if not public_key_content:
            raise Exception("No public SSH Key specified. "
                            "Cancelling operation")

        import oci.identity
        import oci.bastion
        import hashlib
        try:
            if session_type == "MANAGED_SSH":
                target_details = oci.bastion.models.\
                    CreateManagedSshSessionTargetResourceDetails(
                        target_resource_id=target_id,
                        target_resource_port=target_port,
                        target_resource_private_ip_address=target_ip,
                        target_resource_operating_system_user_name=target_user)
            else:
                target_details = oci.bastion.models.\
                    CreatePortForwardingSessionTargetResourceDetails(
                        target_resource_id=target_id,
                        target_resource_port=target_port,
                        target_resource_private_ip_address=target_ip)

            if not session_name:
                # Calculate unique fingerprint based on all params
                params = (
                    target_id + bastion_id +
                    config_profile + public_key_content +
                    f'{target_ip}:{target_port}')

                fingerprint = hashlib.md5(params.encode('utf-8')).hexdigest()
                session_name = f'MDS-{fingerprint}'

            # Check if a session with this fingerprinted name already exists
            sessions = bastion_client.list_sessions(
                bastion_id=bastion_id,
                display_name=session_name,
                session_lifecycle_state='ACTIVE').data
            if len(sessions) == 1:
                # If so, return that session
                return core.oci_object(
                    oci_object=sessions[0],
                    return_type=return_type,
                    format_function=format_session_listing)

            if session_type == "MANAGED_SSH":
                # Check if Bastion Plugin is already enabled
                ia_client = core.get_oci_instance_agent_client(config)
                # cSpell:ignore instanceagent
                bastion_plugin = ia_client.get_instance_agent_plugin(
                    instanceagent_id=target_id,
                    compartment_id=compartment_id,
                    plugin_name="Bastion").data
                if bastion_plugin.status != "RUNNING":
                    # TODO: use UpdateInstanceDetails to enabled the bastion
                    # service
                    raise Exception(
                        'Please enabled the Bastion plugin on '
                        'this instance.')

            session_details = oci.bastion.models.CreateSessionDetails(
                bastion_id=bastion_id,
                display_name=session_name,
                key_details=oci.bastion.models.PublicKeyDetails(
                    public_key_content=public_key_content),
                key_type="PUB",
                session_ttl_in_seconds=ttl_in_seconds,
                target_resource_details=target_details
            )
            new_session = bastion_client.create_session(session_details).data

            if await_creation:
                import time
                # Wait for the Bastion Session to be ACTIVE
                cycles = 0
                while cycles < 24:
                    bastion_session = bastion_client.get_session(
                        session_id=new_session.id).data
                    if bastion_session.lifecycle_state == "ACTIVE":
                        # TODO: Report bug to the Bastion dev team.
                        # Ask them to only switch lifecycle_state to ACTIVE
                        # if the Bastion Session can actually accept connections
                        time.sleep(5)
                        break
                    else:
                        time.sleep(5)
                        s = "." * (cycles + 1)
                        if interactive:
                            print(f'Waiting for Bastion Session to become '
                                  f'active...{s}')
                    cycles += 1

                bastion_session = bastion_client.get_session(
                    session_id=new_session.id).data
                if bastion_session.lifecycle_state != "ACTIVE":
                    raise Exception("Bastion Session did not reach ACTIVE "
                                    "state within 2 minutes.")

            return core.oci_object(
                oci_object=new_session,
                return_type=return_type,
                format_function=lambda s: print(
                    f"Bastion Session {s.display_name} is being\n"
                    f" created. Use mds.list.bastionSessions() to check "
                    "it's provisioning state.\n"))

        except oci.exceptions.ServiceError as e:
            if raise_exceptions:
                raise
            print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
    except Exception as e:
        if raise_exceptions:
            raise
        print(f'ERROR: {e}')