mds_plugin/compute.py (1,279 lines of code) (raw):
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module for supporting OCI Compute Instances, Shapes, Images"""
# cSpell:ignore vnics, vnic, Paramiko, pkey, putfo, getfo, EX_CANTCREAT
# from os import EX_CANTCREAT
from mysqlsh.plugin_manager import plugin_function
from mds_plugin import core, configuration
class SshConnection:
""" A class to handle SSH Connections"""
def __init__(
self, username, host, private_key_file_path="~/.ssh/id_rsa",
private_key_passphrase=None):
""" Opens a ssh connection to the given host
Args:
host (str): The hostname or IP of the host to connect to
private_key_file_path (str): The path to the private key file
private_key_passphrase (str): The pass phrase of the given key
Returns:
A Paramiko client
"""
import os.path
import paramiko
import time
self.client = paramiko.SSHClient()
# Get private key
private_key_file_path = os.path.abspath(
os.path.expanduser(private_key_file_path))
# Assume no passphrase
try:
private_key = paramiko.RSAKey.from_private_key_file(
private_key_file_path, password=private_key_passphrase)
except paramiko.ssh_exception.PasswordRequiredException:
import mysqlsh
private_key_passphrase = mysqlsh.globals.shell.prompt(
"\nPlease enter the passphrase for the given SSH key: ",
options={"type": "password"})
try:
private_key = paramiko.RSAKey.from_private_key_file(
private_key_file_path, password=private_key_passphrase)
except paramiko.ssh_exception.SSHException:
print("Could not decrypt SSH key, wrong password provided.")
raise
except paramiko.ssh_exception.SSHException:
# If reading the file fails, try to add RSA which paramiko requires
import io
with open(private_key_file_path, mode='r') as file:
public_key_content = file.read()
keyfile = io.StringIO(public_key_content.replace(
"BEGIN PRIVATE", "BEGIN RSA PRIVATE"))
private_key = paramiko.RSAKey.from_private_key(keyfile)
keyfile.close()
self.client.set_missing_host_key_policy(
paramiko.AutoAddPolicy())
try:
self.client.connect(host, 22, username, pkey=private_key,
passphrase=private_key_passphrase, timeout=10)
except Exception:
# Wait 5 more seconds
time.sleep(5)
# Try again
self.client.connect(host, 22, username, pkey=private_key,
passphrase=private_key_passphrase, timeout=10)
self.stdin = None
self.stdout = None
self.stderr = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def execute(self, command):
""" Executes the given command
Args:
command (str): The command to execute
Returns:
The stdout output of the command
"""
self.stdin, self.stdout, self.stderr = self.client.exec_command(
command)
output = ""
for line in self.stdout:
output += line
# Close stdin to work around Paramiko issue
self.stdin.close()
return output
def executeAndSendOnStdin(self, command, stdin_text):
import time
success = False
buffer = ""
# Open channel
chan = self.client.get_transport().open_session()
try:
chan.settimeout(timeout=None)
chan.set_combine_stderr(combine=True)
# Execute shell and call import function
chan.exec_command(command)
# Send password to stdin
chan.sendall(f"{stdin_text}\n".encode('utf-8'))
chan.shutdown_write()
# While the command didn't return an exit code yet
read_buffer_size = 1024
while not chan.exit_status_ready():
# Update every 0.1 seconds
time.sleep(0.1)
if chan.recv_ready():
buffer += chan.recv(
read_buffer_size).decode('utf-8')
# Ensure we gobble up all remaining data
while True:
try:
output = chan.recv(
read_buffer_size).decode('utf-8')
if not output and not chan.recv_ready():
break
else:
buffer += output
except Exception:
continue
success = chan.recv_exit_status() == 0
finally:
chan.close()
return success, buffer
def put_local_file(self, local_file_path, remote_file_path):
sftp = self.client.open_sftp()
try:
sftp.put(localpath=local_file_path, remotepath=remote_file_path)
finally:
sftp.close()
def put_local_file_object(self, file_object, remote_file_path):
sftp = self.client.open_sftp()
try:
sftp.putfo(fl=file_object, remotepath=remote_file_path)
finally:
sftp.close()
def get_remote_file(self, remote_file_path, local_file_path):
sftp = self.client.open_sftp()
try:
sftp.get(remotepath=remote_file_path, localpath=local_file_path)
finally:
sftp.close()
def get_remote_file_as_file_object(self, remote_file_path, file_object):
sftp = self.client.open_sftp()
try:
sftp.getfo(remotepath=remote_file_path, fl=file_object)
finally:
sftp.close()
def get_last_error(self):
if self.stderr is None:
return ""
output = ""
for line in self.stderr:
output += line
return output
def close(self):
""" Closes an open connection
Args:
-
Returns:
None
"""
try:
self.client.close()
except Exception as e:
# Close stdin to work around Paramiko issue 1617 on github
if str(e) != "'NoneType' object has no attribute 'time'":
raise
def format_instance_listing(items, current=None, vnics=None,
config=None):
"""Returns a formatted list of instances.
If compartment_id is given, the current and parent compartment
will be listed as well.
Args:
data (list): A list of instance objects.
config (object): An OCI config object or None.
compartment_id (str): The OCID of the current compartment.
Returns:
The formatted list as string
"""
import re
import oci.core.models
# If a single item was given, wrap it in a list
if not type(items) is list:
items = [items]
out = ""
# return compartments in READABLE text output
i = 1
for c in items:
# Shorten to 24 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
c.display_name[:22] + '..'
if len(c.display_name) > 24
else c.display_name)
# Get public IP
public_ip = ""
private_ip = ""
if vnics and c.lifecycle_state in [
oci.core.models.Instance.LIFECYCLE_STATE_RUNNING,
oci.core.models.Instance.LIFECYCLE_STATE_STARTING,
oci.core.models.Instance.LIFECYCLE_STATE_STOPPED,
oci.core.models.Instance.LIFECYCLE_STATE_STOPPING]:
instance_vnics = [v for v in vnics if v.instance_id == c.id]
if len(instance_vnics) > 0:
virtual_network = core.get_oci_virtual_network_client(
config=config)
try:
vnic = virtual_network.get_vnic(
instance_vnics[0].vnic_id).data
public_ip = vnic.public_ip \
if vnic.public_ip else ''
private_ip = vnic.private_ip \
if vnic.private_ip else ''
except oci.exceptions.ServiceError as e:
# Ignore NotFound error
if e.code == "NotAuthorizedOrNotFound":
pass
index = f"*{i:>3}" if current == c.id else f"{i:>4}"
out += (f"{index} {name:24} {c.shape[:22]:22} {c.fault_domain[:15]:15} "
f"{public_ip if public_ip != '' else private_ip:15} "
f"{c.lifecycle_state}\n")
i += 1
return out
def format_image_list_item(data, index):
img = data[index]
# Shorten to 16 chars max
os = img.operating_system[:14] + '..' \
if len(img.operating_system) > 16 \
else img.operating_system
# Shorten to 16 chars max
os_version = img.operating_system_version[:20] + '..' \
if len(img.operating_system_version) > 22 \
else img.operating_system_version
return (f"{index+1:>3} {os:16} "
f"{os_version:22} {img.display_name[-12:]:12}")
def format_image_listing(data):
"""Returns a formatted list of images.
Args:
data (list): A list of images objects.
Returns:
The formatted list as string
"""
import math
out = ""
# return compartments in READABLE text output
split = math.ceil(len(data)/2)
i = 1
while i <= split:
out += format_image_list_item(data, i-1)
if i+split < len(data):
out += f" {format_image_list_item(data, split+i-1)}\n"
else:
out += "\n"
i += 1
return out
def format_compute_images(items) -> str:
"""Formats a given list of objects in a human readable form
Args:
items: Either a list of objects or a single object
Returns:
The objects formatted as str
"""
# If a single db_system was given, wrap it in a list
if not type(items) is list:
items = [items]
# return objects in READABLE text output
out = ""
id = 1
for i in items:
out += (f"{id:>4} " +
core.fixed_len(i.display_name, 48, ' ', True) +
core.fixed_len(i.operating_system, 16, ' ', True) +
core.fixed_len(i.operating_system_version, 7, ' ') +
core.fixed_len(i.lifecycle_state, 11, '\n'))
id += 1
return out
def get_instance_by_id(**kwargs):
"""Returns an instance object for the given id
Args:
**kwargs: Optional parameters
Keyword Args:
instance_id (str): OCID of the instance.
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
The compartment or tenancy object with the given id
"""
instance_id = kwargs.get("instance_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the right config, either the one passed in or, if that one is None,
# the global one
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
current_instance_id = configuration.get_current_instance_id(
config=config)
import oci.exceptions
try:
# Initialize the compute client
compute = core.get_oci_compute_client(config=config)
# Return the instance if found
return core.return_oci_object(
oci_object=compute.get_instance(instance_id).data,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_instance_listing,
current=current_instance_id)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.computeInstanceVncSecurityLists')
def get_instance_vcn_security_lists(**kwargs):
"""Returns the vnc_security_lists of an instance
If no name is given, it will prompt the user for the name.
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the instance.
instance_id (str): The OCID of the instance
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
return_python_object (bool): Used for internal plugin calls
Returns:
None or a list of vnc_security_lists
"""
instance_name = kwargs.get("instance_name")
instance_id = kwargs.get("instance_id")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config, compartment and instance
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
instance_id = configuration.get_current_instance_id(
instance_id=instance_id, config=config)
import oci.exceptions
try:
instance = get_instance(
instance_name=instance_name, instance_id=instance_id,
compartment_id=compartment_id, config=config,
interactive=interactive, raise_exceptions=raise_exceptions,
return_python_object=True)
if instance is None:
raise Exception("No instance given or found."
"Operation cancelled.")
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# Get all VNICs of the instance
attached_vnics = oci.pagination.list_call_get_all_results(
compute.list_vnic_attachments,
instance_id=instance.id,
compartment_id=compartment_id).data
# Find the VNIC with a public IP
if attached_vnics:
virtual_network = core.get_oci_virtual_network_client(
config=config)
for attached_vnic in attached_vnics:
vnic = virtual_network.get_vnic(
attached_vnic.vnic_id).data
public_ip = vnic.public_ip
# If a public IP is found, get it's subnet and the
# corresponsing security lists
if public_ip:
subnet = virtual_network.get_subnet(
vnic.subnet_id).data
if subnet is None:
raise Exception(
"Could not get the subnet of the instance.")
# Get the list of ids of security lists
sec_lists = subnet.security_list_ids
# Get the actual security list objects
sec_lists = [
virtual_network.get_security_list(id).data
for id in sec_lists]
return core.return_oci_object(
oci_object=sec_lists,
return_python_object=return_python_object)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except (ValueError, oci.exceptions.ClientError) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
def add_ingress_port_to_security_lists(**kwargs):
"""Checks if the given ingress port already is a security list,
if not it gets added.
Args:
**kwargs: Optional parameters
Keyword Args:
security_lists (list): A list of security_lists.
port (int): The port to check
description (str): A description for the ingress rule
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
Returns:
True on success
"""
security_lists = kwargs.get("security_lists")
port = kwargs.get("port")
description = kwargs.get("description")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
if security_lists is None:
raise ValueError("No security_lists given.")
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
for sec_list in security_lists:
for rule in sec_list.ingress_security_rules:
if rule.tcp_options is not None and \
port >= rule.tcp_options.destination_port_range.min and \
port <= rule.tcp_options.destination_port_range.max and \
rule.protocol == "6" and \
rule.source == "0.0.0.0/0":
return True
if len(security_lists) == 0:
raise Exception("No security list available for this network.")
sec_list = security_lists[0]
import oci.exceptions
try:
network_client = core.get_oci_virtual_network_client(
config=config)
sec_list.ingress_security_rules.append(
oci.core.models.IngressSecurityRule(
protocol="6",
source="0.0.0.0/0",
is_stateless=False,
source_type="CIDR_BLOCK",
tcp_options=oci.core.models.TcpOptions(
destination_port_range=oci.core.models.PortRange(
max=port,
min=port),
source_port_range=None),
udp_options=None,
description=description
)
)
details = oci.core.models.UpdateSecurityListDetails(
defined_tags=sec_list.defined_tags,
display_name=sec_list.display_name,
egress_security_rules=sec_list.egress_security_rules,
freeform_tags=sec_list.freeform_tags,
ingress_security_rules=sec_list.ingress_security_rules
)
network_client.update_security_list(
security_list_id=sec_list.id,
update_security_list_details=details)
return True
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'Could not list the availability domains for this '
f'compartment.\nERROR: {str(e)}')
def format_shape_listing(data):
"""Returns a formatted list of shapes.
Args:
data (list): A list of shape objects.
Returns:
The formatted list as string
"""
out = ""
# return compartments in READABLE text output
i = 1
for s in data:
# cSpell:ignore ocpus
out += f"{i:>4} {s.shape:20} {s.ocpus:5.1f}x " \
f"{s.processor_description[:22]:22} " \
f"{s.memory_in_gbs:5.0f}GB Ram\n"
i += 1
return out
def format_shapes(items) -> str:
"""Formats a given list of objects in a human readable form
Args:
items: Either a list of objects or a single object
Returns:
The objects formatted as str
"""
# If a single db_system was given, wrap it in a list
if not type(items) is list:
items = [items]
# return objects in READABLE text output
# cSpell:ignore gbps
out = ""
id = 1
for i in items:
out += (f"{id:>4} " +
core.fixed_len(i.shape, 26, ' ', True) +
core.fixed_len(str(i.ocpus), 7, ' ', align_right=True) +
core.fixed_len(i.processor_description, 35, ' ') +
core.fixed_len(
str(i.memory_in_gbs), 7, ' ', align_right=True) +
core.fixed_len(
str(i.networking_bandwidth_in_gbps), 7, '\n',
align_right=True))
id += 1
return out
@plugin_function('mds.list.computeInstances', shell=True, cli=True, web=True)
def list_instances(**kwargs):
"""Lists instances
This function will list all instances of the compartment with the
given compartment_id.
Args:
**kwargs: Optional parameters
Keyword Args:
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
return_formatted (bool): If set to true, a list object is returned.
Returns:
A list of dicts representing the compartments
"""
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
import oci.exceptions
import oci.util
import oci.pagination
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
current_instance_id = configuration.get_current_instance_id(
config=config)
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# List the compute instances
instances = compute.list_instances(
compartment_id=compartment_id).data
# Filter out all deleted compartments
instances = [c for c in instances if c.lifecycle_state != "DELETED" and
c.lifecycle_state != "TERMINATED"]
if return_formatted:
# Get all VNICs of the compartment
vnics = oci.pagination.list_call_get_all_results(
compute.list_vnic_attachments,
compartment_id=compartment_id).data
return format_instance_listing(
items=instances, vnics=vnics,
current=current_instance_id, config=config)
else:
return oci.util.to_dict(instances)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
return
@plugin_function('mds.get.computeInstance', shell=True, cli=True, web=True)
def get_instance(**kwargs):
"""Returns an instance object based on instance_name or instance_id
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the instance
instance_id (str): OCID of the instance
ignore_current (bool): Whether the current instance should be ignored
compartment_id (str): OCID of the compartment
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
The compartment or tenancy object or None if not found
"""
instance_name = kwargs.get("instance_name")
instance_id = kwargs.get("instance_id")
ignore_current = kwargs.get("ignore_current", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
current_instance_id = configuration.get_current_instance_id(
config=config)
if not ignore_current and instance_name is None and instance_id is None:
instance_id = current_instance_id
# if the instance_id was given, look it up directly
if instance_id:
return get_instance_by_id(
instance_id=instance_id, config=config,
interactive=interactive, raise_exceptions=raise_exceptions,
return_formatted=return_formatted,
return_python_object=return_python_object)
import oci.util
import oci.exceptions
from mds_plugin import compartment
try:
# Get the full path of this tenancy
full_path = compartment.get_compartment_full_path(
compartment_id, config)
if instance_name is None:
print(f"Directory of {full_path}\n")
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# Get the list of instances for this compartment
data = compute.list_instances(
compartment_id=compartment_id).data
# Filter out all deleted instances
data = [c for c in data if c.lifecycle_state != "DELETED"
and c.lifecycle_state != "TERMINATED"
and c.lifecycle_state != "TERMINATING"]
if len(data) < 1:
print("There are no instances in this compartment.\n")
return
# If an instance_name was given not given, print the instance list
if instance_name is None:
instance_list = format_instance_listing(
items=data, config=config)
print(f"Compute Instances:\n{instance_list}")
# Let the user choose from the list
instance = core.prompt_for_list_item(
item_list=data, prompt_caption=("Please enter the name or index "
"of the compute instance: "),
item_name_property="display_name", given_value=instance_name)
return core.return_oci_object(
oci_object=instance,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_instance_listing,
current=current_instance_id)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.get.computeInstanceId')
def get_instance_id(instance_name=None, compartment_id=None, config=None,
interactive=True):
"""Returns an instance OCID based on instance_name or instance_id
Args:
instance_name (str): The name of the instance
compartment_id (str): OCID of the compartment
config (object): An OCI config object or None
interactive (bool): Whether exceptions are raised
Returns:
The compartment or tenancy object or None if not found
"""
instance = get_instance(
instance_name=instance_name, compartment_id=compartment_id,
config=config, interactive=interactive,
return_python_object=True)
return None if instance is None else instance.id
@plugin_function('mds.get.computeInstancePublicIp')
def get_instance_public_ip(**kwargs):
"""Returns the public ip of an instance
If no name is given, it will prompt the user for the name.
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the instance.
instance_id (str): The OCID of the instance
private_ip_fallback (bool): Whether the private IP should be returned
if there is no public one
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
Returns:
The IP as string or None
"""
instance_name = kwargs.get("instance_name")
instance_id = kwargs.get("instance_id")
private_ip_fallback = kwargs.get("private_ip_fallback")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
# Get the active config, compartment and instance
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
instance_id = configuration.get_current_instance_id(
instance_id=instance_id, config=config)
import oci.exceptions
try:
instance = get_instance(
instance_name=instance_name, instance_id=instance_id,
compartment_id=compartment_id, config=config,
return_python_object=True)
if instance is None:
raise ValueError("No instance given."
"Operation cancelled.")
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# Get all VNICs of the instance
try:
attached_vnics = oci.pagination.list_call_get_all_results(
compute.list_vnic_attachments,
instance_id=instance.id,
compartment_id=compartment_id).data
except Exception as e:
raise Exception(
"Cannot get VNICs of the given instance.\n"
f"{str(e)}")
instance_ip = None
if attached_vnics:
virtual_network = core.get_oci_virtual_network_client(
config=config)
for attached_vnic in attached_vnics:
vnic = virtual_network.get_vnic(
attached_vnic.vnic_id).data
instance_ip = vnic.public_ip
if instance_ip:
break
if not instance_ip and private_ip_fallback:
for attached_vnic in attached_vnics:
vnic = virtual_network.get_vnic(
attached_vnic.vnic_id).data
instance_ip = vnic.private_ip
if instance_ip:
break
return instance_ip
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'Could not get the VNIC of {instance.display_name}\n'
f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except ValueError as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.list.computeShapes', shell=True, cli=True, web=True)
def list_shapes(**kwargs):
"""Returns a list of all available compute shapes
This list is specific for the given compartment and availability_domain
Args:
**kwargs: Additional options
Keyword Args:
limit_shapes_to (list): A list of shape names
availability_domain (str): The name of the availability_domain to use
compartment_id (str): OCID of the compartment
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
A list of shapes
"""
limit_shapes_to = kwargs.get("limit_shapes_to")
availability_domain = kwargs.get("availability_domain")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.util
import oci.exceptions
from mds_plugin import compartment
try:
# Get the availability_domain name
availability_domain_obj = compartment.get_availability_domain(
random_selection=not interactive,
compartment_id=compartment_id,
availability_domain=availability_domain,
config=config, interactive=interactive,
raise_exceptions=raise_exceptions,
return_formatted=return_formatted,
return_python_object=True)
if availability_domain_obj:
availability_domain = availability_domain_obj.name
if not availability_domain:
raise ValueError("No availability domain given. "
"Operation cancelled.")
# Initialize the identity client
compute_client = core.get_oci_compute_client(config=config)
# Get list of available shapes
shapes = compute_client.list_shapes(
compartment_id=compartment_id,
availability_domain=availability_domain).data
# If a list of shape names was given, filter according to that list
if limit_shapes_to is not None:
shapes = [s for s in shapes if any(
s.shape in l_s for l_s in limit_shapes_to)]
return core.return_oci_object(
oci_object=shapes,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_shapes)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise e
print(f'Could not list the shapes for this compartment.\n'
f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise e
print(f'Could not list the shapes for this compartment.\n'
f'ERROR: {str(e)}')
@plugin_function('mds.get.computeShape')
def get_shape(**kwargs):
"""Gets a certain shape specified by name
The shape is specific for the given compartment and availability_domain
Args:
**kwargs: Additional options
Keyword Args:
shape_name (str): Name of the shape
limit_shapes_to (list): List of strings to limit the shape selection
availability_domain (str): The name of the availability_domain to use
compartment_id (str): OCID of the parent compartment.
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
An shape object or None
"""
shape_name = kwargs.get("shape_name")
limit_shapes_to = kwargs.get("limit_shapes_to")
availability_domain = kwargs.get("availability_domain")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
# Get the list of available shapes
shapes = list_shapes(
limit_shapes_to=limit_shapes_to,
compartment_id=compartment_id,
availability_domain=availability_domain,
config=config, interactive=interactive,
raise_exceptions=True,
return_python_object=True)
if not shapes:
raise Exception("No shapes found.")
# Let the user choose from the list
shape = core.prompt_for_list_item(
item_list=shapes,
prompt_caption="Please enter the name or index of the shape: ",
item_name_property="shape", given_value=shape_name,
print_list=True)
return core.return_oci_object(
oci_object=shape,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_shapes)
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {str(e)}')
@plugin_function('mds.get.computeShapeName')
def get_shape_name(**kwargs):
"""Gets a certain shape id specified by name for the given compartment and
availability_domain
Args:
**kwargs: Additional options
Keyword Args:
shape_name (str): Name of the shape
limit_shapes_to (list): List of strings to limit the shape selection
availability_domain (str): The name of the availability_domain to use
compartment_id (str): OCID of the parent compartment.
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
Returns:
The shape's name (which is the shape's id) or None
"""
shape_name = kwargs.get("shape_name")
limit_shapes_to = kwargs.get("limit_shapes_to")
availability_domain = kwargs.get("availability_domain")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
shape = get_shape(
shape_name=shape_name, limit_shapes_to=limit_shapes_to,
availability_domain=availability_domain,
compartment_id=compartment_id,
config=config, config_profile=config_profile,
interactive=interactive, raise_exceptions=raise_exceptions,
return_python_object=True)
return None if shape is None else shape.shape
@plugin_function('mds.list.computeImages')
def list_images(**kwargs):
"""Gets a compute image
Args:
**kwargs: Additional options
Keyword Args:
operating_system (str): The name of the operating system
operating_system_version (str): The version of the operating system
image_caption (str): The caption of the compute image to use
shape (str): The name of the shape to use.
compartment_id (str): OCID of the parent compartment.
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
a compute image object
"""
operating_system = kwargs.get("operating_system")
operating_system_version = kwargs.get("operating_system_version")
image_caption = kwargs.get("image_caption")
shape = kwargs.get("shape")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.pagination
import oci.exceptions
try:
# Initialize the oci client
compute_client = core.get_oci_compute_client(config=config)
# Get list of images
images = oci.pagination.list_call_get_all_results(
compute_client.list_images,
compartment_id=compartment_id,
lifecycle_state="AVAILABLE",
shape=shape,
operating_system=operating_system,
operating_system_version=operating_system_version,
sort_by="DISPLAYNAME",
sort_order="ASC").data
# If no image_caption was given, let the user select an
# operating system first, then the actual image
if not image_caption and not operating_system and interactive:
os_list = sorted({img.operating_system for img in images})
operating_system = core.prompt_for_list_item(
item_list=os_list,
prompt_caption=(
"Please enter the name or index of the operating "
"system: "),
print_list=True)
if operating_system is None:
raise ValueError("No operation system given. "
"Operation cancelled.")
# Filter by given operating_system and sort by operating_system,
# operating_system_version DESC, time_created
images = sorted(sorted(
[i for i in images if i.operating_system == operating_system],
key=lambda img: (img.operating_system,
img.operating_system_version,
img.time_created),
reverse=True), key=lambda img: img.operating_system)
return core.return_oci_object(
oci_object=images,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_compute_images)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.computeImage')
def get_image(**kwargs):
"""Gets a compute image
Args:
**kwargs: Additional options
Keyword Args:
operating_system (str): The name of the operating system
operating_system_version (str): The version of the operating system
image_caption (str): The caption of the compute image to use
shape (str): The name of the shape to use.
use_latest_image (bool): Whether to use the latest compute image
compartment_id (str): OCID of the parent compartment.
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
a compute image object
"""
operating_system = kwargs.get("operating_system")
operating_system_version = kwargs.get("operating_system_version")
image_caption = kwargs.get("image_caption")
shape = kwargs.get("shape")
use_latest_image = kwargs.get("use_latest_image", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
images = list_images(
operating_system=operating_system,
operating_system_version=operating_system_version,
image_caption=image_caption,
shape=shape,
compartment_id=compartment_id, config=config,
interactive=interactive, raise_exceptions=raise_exceptions,
return_python_object=True)
if len(images) == 0:
raise ValueError(
"No compute image found using the given parameters."
"Operation cancelled.")
# If there is only one image, return it
image = None
if len(images) == 1 or use_latest_image or not interactive:
image = images[0]
else:
# Let the user choose from the image list
print(f"\nPlease choose a compute image from this list.\n")
image = core.prompt_for_list_item(
item_list=images, prompt_caption=(
"Please enter the name or index of the "
"compute image: "),
item_name_property="display_name",
given_value=image_caption,
print_list=True)
return core.return_oci_object(
oci_object=image,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_compute_images)
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.get.computeImageId')
def get_image_id(operating_system=None, operating_system_version=None,
image_caption=None, shape=None, compartment_id=None,
config=None, interactive=True):
"""Gets a compute image id
Args:
operating_system (str): The name of the operating system
operating_system_version (str): The version of the operating system
image_caption (str): The caption of the compute image to use
shape (str): The name of the shape to use.
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
interactive (bool): Whether expections are raised
Returns:
an OCID
"""
image = get_image(
operating_system=operating_system,
operating_system_version=operating_system_version,
image_caption=operating_system_version, shape=shape,
compartment_id=compartment_id, config=config,
interactive=interactive)
return None if image is None else image.id
def format_vnic_listing(vnics):
"""Returns a formatted list of vnics.
Args:
vnics (list): A list of vnics objects.
Returns:
The formatted list as string
"""
import re
out = ""
i = 1
for v in vnics:
# Shorten to 24 chars max, remove linebreaks
name = re.sub(r'[\n\r]', ' ',
v.display_name[:22] + '..'
if len(v.display_name) > 24
else v.display_name)
private_ip = v.private_ip if v.private_ip else ""
public_ip = v.public_ip if v.public_ip else ""
out += (f"{i:>4} {name:24} {private_ip:15} {public_ip:15} "
f"{v.lifecycle_state[:8]:8} {v.time_created:%Y-%m-%d %H:%M}\n")
i += 1
return out
@plugin_function('mds.list.computeInstanceVnics')
def list_vnics(**kwargs):
"""Lists all available vnics for the given compartment and
availability_domain
Args:
**kwargs: Optional parameters
Keyword Args:
instance_id (str): OCID of the compute instance
availability_domain (str): The name of the availability_domain to use.
ignore_current (bool): Whether the current instance should be ignored
compartment_id (str): OCID of the parent compartment
config (dict): An OCI config object or None
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
The list of shapes in either JSON or human readable format
"""
instance_id = kwargs.get("instance_id")
availability_domain = kwargs.get("availability_domain")
ignore_current = kwargs.get("ignore_current", False)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
current_instance_id = configuration.get_current_instance_id(
config=config)
if not ignore_current and not instance_id:
instance_id = current_instance_id
if not instance_id and interactive:
instance_id = get_instance_id(
compartment_id=compartment_id,
config=config, interactive=interactive)
if not instance_id:
raise ValueError("No instance_id given."
"Cancelling operation")
import oci.exceptions
try:
# Initialize the oci client
compute = core.get_oci_compute_client(config=config)
network = core.get_oci_virtual_network_client(config=config)
vnic_attachments = compute.list_vnic_attachments(
compartment_id=compartment_id,
availability_domain=availability_domain,
instance_id=instance_id).data
vnics = []
for vnic_att in vnic_attachments:
vnics.append(network.get_vnic(vnic_att.vnic_id).data)
return core.return_oci_object(
oci_object=vnics,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_vnic_listing)
except oci.exceptions.ServiceError as e:
if raise_exceptions:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.create.computeInstance')
def create_instance(**kwargs):
"""Creates a new compute instance
This function will create a new compartment.
Args:
**kwargs: Additional options
Keyword Args:
instance_name (str): The name used for the new compartment.
availability_domain (str): The name of the availability_domain to use
shape (str): The compute shape used for the instance
cpu_count (int): The number of OCPUs
memory_size (int): The amount of memory
subnet_id (str): The OCID of the subnet to use
public_subnet (bool): Whether the subnet should be public or private
operating_system (str): The name of the operating system,
e.g. "Oracle Linux"
operating_system_version (str): The version of the operating system,
e.g. 8
use_latest_image (bool): Whether to use the latest compute image
ssh_public_key_path (str): The path to the public ssh key,
default is ~/.ssh/id_rsa.pub
init_script (str): A string holding the commands to execute at first
instance startup, starting with #!/bin/bash and separated by
linebreaks
init_script_file_path (str): The path to an init script to be
executed at first instance startup. If specified, this file
will be used instead of the script passed in the init_script
parameter
defined_tags (dict): The defined_tags of the dynamic group.
freeform_tags (dict): The freeform_tags of the dynamic group
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
The new instance if interactive is set to false
"""
instance_name = kwargs.get("instance_name")
availability_domain = kwargs.get("availability_domain")
shape = kwargs.get("shape")
cpu_count = kwargs.get("cpu_count", 1)
memory_size = kwargs.get("memory_size", 16)
subnet_id = kwargs.get("subnet_id")
public_subnet = kwargs.get("public_subnet")
operating_system = kwargs.get("operating_system")
operating_system_version = kwargs.get("operating_system_version")
use_latest_image = kwargs.get("use_latest_image", False)
ssh_public_key_path = kwargs.get(
"ssh_public_key_path", "~/.ssh/id_rsa.pub")
init_script = kwargs.get("init_script")
init_script_file_path = kwargs.get("init_script_file_path")
defined_tags = kwargs.get("defined_tags")
# Manual conversion from Shell Dict type until this is automatically done
if defined_tags:
defined_tags = dict(defined_tags)
freeform_tags = kwargs.get("freeform_tags")
# Manual conversion from Shell Dict type until this is automatically done
if freeform_tags:
freeform_tags = dict(freeform_tags)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.core.models
import oci.exceptions
import oci.pagination
import os.path
import pathlib
import base64
from pathlib import Path
import mysqlsh
from mds_plugin import compartment
from mds_plugin import network
try:
if interactive:
print("Creating a new compute instance ...\n")
# Get a name
if instance_name is None and interactive:
instance_name = mysqlsh.globals.shell.prompt(
"Please enter the name for the new instance: ",
{'defaultValue': ''}).strip()
if not instance_name:
raise ValueError(
"No instance name given. Operation cancelled.")
# Get the availability_domain name
availability_domain_obj = compartment.get_availability_domain(
compartment_id=compartment_id,
availability_domain=availability_domain,
random_selection=True,
config=config,
interactive=interactive,
return_python_object=True)
if availability_domain_obj is None:
raise ValueError("No availability domain given. "
"Operation cancelled.")
else:
availability_domain = availability_domain_obj.name
if interactive:
print(f"Using availability domain {availability_domain}.")
# Get list of available shapes
shape_name = get_shape_name(
shape_name=shape, compartment_id=compartment_id,
availability_domain=availability_domain, config=config,
interactive=interactive)
if not shape_name:
print("Operation cancelled.")
return
if interactive:
print(f"Using shape {shape_name}.")
# Get id of compute image
image = get_image(
operating_system=operating_system,
operating_system_version=operating_system_version,
use_latest_image=use_latest_image,
shape=shape_name, compartment_id=compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if image is None:
print("Operation cancelled.")
return
image_id = image.id
if interactive:
print(f"Using image {image.display_name}.")
# Convert Unix path to Windows
ssh_public_key_path = os.path.abspath(
os.path.expanduser(ssh_public_key_path))
# Check if there is a key available
if os.path.exists(ssh_public_key_path):
with open(ssh_public_key_path, mode='r') as file:
public_key = file.read()
else:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
import stat
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# cSpell:ignore PKCS
private_key = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption())
public_key = key.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
# Build ssh_private_key_path from ssh_public_key_path by
# removing extension
ssh_private_key_path = os.path.splitext(ssh_public_key_path)[0]
if ssh_private_key_path == ssh_public_key_path:
ssh_private_key_path = ssh_public_key_path + ".private"
# Create path
key_path = os.path.dirname(
os.path.abspath(ssh_private_key_path))
Path(key_path).mkdir(parents=True, exist_ok=True)
# Write out keys
with open(ssh_private_key_path, mode='wb') as file:
file.write(private_key)
with open(ssh_public_key_path, mode='wb') as file:
file.write(public_key)
# Fix permissions
# cSpell:ignore IRUSR IWUSR
os.chmod(ssh_private_key_path, stat.S_IRUSR | stat.S_IWUSR)
os.chmod(ssh_public_key_path, stat.S_IRUSR | stat.S_IWUSR)
# Encode public_key to string
public_key = public_key.decode("utf-8")
# Set SSH key and creator metadata
instance_metadata = {
'ssh_authorized_keys': public_key,
'creator': 'MySQL Shell MDS Plugin'
}
# Load init_script_file_path if given
if init_script_file_path:
init_script_file_path = os.path.abspath(
os.path.expanduser(init_script_file_path))
if not os.path.exists(init_script_file_path):
print(f"Error: Init script file path '{init_script_file_path}' "
"not found.")
return
instance_metadata['user_data'] = \
oci.util.file_content_as_launch_instance_user_data(
init_script_file_path)
# Set the init_script if given
elif init_script:
instance_metadata['user_data'] = base64.b64encode(
init_script.encode('utf-8')).decode('utf-8')
# Get a public subnet for the instance
if not subnet_id and interactive:
print("Selecting a subnet for the compute instance ...")
subnet = network.get_subnet(
subnet_id=subnet_id, public_subnet=public_subnet,
availability_domain=availability_domain,
compartment_id=compartment_id, config=config)
if subnet is None and public_subnet == True and interactive:
print("\nDo you want to select "
"a network with a private subnet instead?\n\n"
"Please note that access from the internet will "
"not be possible and a \njump host needs to be "
"used to access the resource\n")
prompt = mysqlsh.globals.shell.prompt(
"Select a network with a private subnet [YES/no]: ",
{'defaultValue': 'yes'}).strip().lower()
if prompt == "yes":
subnet = network.get_subnet(
subnet_id=subnet_id, public_subnet=False,
compartment_id=compartment_id, config=config)
if subnet is None:
print("Operation cancelled.")
return
if interactive:
print(f"Using subnet {subnet.display_name}.")
# Setup the instance details
launch_instance_details = oci.core.models.LaunchInstanceDetails(
display_name=instance_name,
compartment_id=compartment_id,
availability_domain=availability_domain,
shape=shape_name,
shape_config=oci.core.models.LaunchInstanceShapeConfigDetails(
ocpus=float(cpu_count),
memory_in_gbs=float(memory_size)
),
metadata=instance_metadata,
source_details=oci.core.models.InstanceSourceViaImageDetails(
image_id=image_id),
create_vnic_details=oci.core.models.CreateVnicDetails(
subnet_id=subnet.id
),
defined_tags=defined_tags,
freeform_tags=freeform_tags,
agent_config=oci.core.models.LaunchInstanceAgentConfigDetails(
plugins_config=[
oci.core.models.InstanceAgentPluginConfigDetails(
desired_state="ENABLED",
name="Bastion"
)
]
)
)
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# Create the instance
instance = compute.launch_instance(launch_instance_details).data
if interactive:
print(f"Compute instance {instance_name} is being created.\n"
f"Use mds.ls() to check it's provisioning state.\n")
return core.return_oci_object(
oci_object=instance,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_instance_listing)
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.delete.computeInstance', shell=True, cli=True, web=True)
def delete_instance(**kwargs):
"""Deletes the compute instance with the given name
If no name is given, it will prompt the user for the name.
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the instance.
instance_id (str): The OCID of the instance
await_deletion (bool): Whether to wait till the bastion reaches
lifecycle state DELETED
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
config_profile (str): The name of an OCI config profile
ignore_current (bool): Whether the current instance should be ignored
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None
"""
instance_name = kwargs.get("instance_name")
instance_id = kwargs.get("instance_id")
await_deletion = kwargs.get("await_deletion")
compartment_id = kwargs.get("compartment_id")
ignore_current = kwargs.get("ignore_current", False)
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
if not ignore_current and instance_name is None:
instance_id = configuration.get_current_instance_id(
instance_id=instance_id, config=config)
import oci.exceptions
try:
# Initialize the compute client
compute_client = core.get_oci_compute_client(config=config)
if not instance_id:
instance = get_instance(
instance_name=instance_name, compartment_id=compartment_id,
config=config, interactive=interactive,
raise_exceptions=raise_exceptions,
return_python_object=True)
else:
instance = get_instance_by_id(
instance_id=instance_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions,
return_python_object=True)
if interactive:
# Prompt the user for confirmation
prompt = core.prompt(
"Are you sure you want to delete the instance "
f"{instance.display_name} [yes/NO]: ",
{'defaultValue': 'no'}).strip().lower()
if prompt != "yes":
raise Exception("Deletion aborted.")
compute_client.terminate_instance(instance.id)
# If the function should wait till the bastion reaches the DELETED
# lifecycle state
if await_deletion:
import time
if interactive:
print('Waiting for instance to be deleted...', end="")
# Wait for the instance to be TERMINATED
cycles = 0
while cycles < 48:
instance = compute_client.get_instance(
instance_id=instance_id).data
if instance.lifecycle_state == "TERMINATED":
break
else:
time.sleep(5)
# s = "." * (cycles + 1)
if interactive:
print('.', end="")
cycles += 1
if interactive:
print("")
if instance.lifecycle_state != "TERMINATED":
raise Exception("Instance did not reach the TERMINATED "
"state within 4 minutes.")
if interactive:
print(f"Instance '{instance.display_name}' "
"was deleted successfully.")
elif interactive:
print(f"Instance '{instance.display_name}' is being deleted.")
except oci.exceptions.ServiceError as e:
if interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except (Exception) as e:
if raise_exceptions:
raise
print(f'ERROR: {e}')
@plugin_function('mds.update.computeInstance')
def update_instance(instance_name=None, **kwargs):
"""Updates a compute instance with the new values
If no name is given, it will prompt the user for the name.
Args:
instance_name (str): The name of the instance.
**kwargs: Additional options.
Keyword Args:
instance_id (str): The OCID of the instance.
name (str): The new name to use
compartment_id (str): OCID of the compartment.
config (dict): An OCI config object or None.
interactive (bool): Whether user input is required.
Returns:
None
"""
instance_id = kwargs.get("instance_id")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
new_name = kwargs.get("name")
interactive = kwargs.get("interactive", True)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.core.models
import oci.exceptions
import mysqlsh
# Initialize the compute client
compute = core.get_oci_compute_client(config=config)
if instance_id is None or instance_id == "":
instance = get_instance(
instance_name=instance_name, compartment_id=compartment_id,
config=config, interactive=interactive,
return_python_object=True)
else:
instance = get_instance_by_id(
instance_id=instance_id, config=config,
interactive=interactive,
return_python_object=True)
if new_name is None and interactive:
new_name = mysqlsh.globals.shell.prompt(
"Please enter a new name for the instance "
f"[{instance.display_name}]: ",
{'defaultValue': instance.display_name}).strip()
if new_name == instance.display_name:
print("Operation cancelled.")
return
update_details = oci.core.models.UpdateInstanceDetails(
display_name=new_name
)
compute.update_instance(instance_id=instance.id,
update_instance_details=update_details)
print(f"Compute instance {instance.display_name} was updated.\n")
except oci.exceptions.ServiceError as e:
if e.code == "NotAuthorizedOrNotFound":
print(f'You do not have privileges to delete this instance.\n')
else:
print(
f'Could not delete the instance {instance.display_name}\n')
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
except Exception as e:
print(f"ERROR: {str(e)}")
@plugin_function('mds.execute.ssh')
def execute_ssh_command(command=None, **kwargs):
"""Execute the given command on the instance
If no name is given, it will prompt the user for the name.
Args:
command (str): The command to execute.
**kwargs: Additional options
Keyword Args:
instance_name (str): The name of the instance.
instance_id (str): The OCID of the instance
private_key_file_path (str): The path to the private key
private_key_passphrase (str): The passphrase of the private key
compartment_id (str): OCID of the compartment.
config (object): An OCI config object or None.
interactive (bool): Whether to prompt the user for input and
throw exceptions
Returns:
None
"""
instance_name = kwargs.get("instance_name")
instance_id = kwargs.get("instance_id")
private_key_file_path = kwargs.get(
"private_key_file_path", "~/.ssh/id_rsa")
private_key_passphrase = kwargs.get("private_key_passphrase")
instance_id = kwargs.get("instance_id")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", True)
# Get the active config, compartment and instance
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
instance_id = configuration.get_current_instance_id(
instance_id=instance_id, config=config)
import oci.exceptions
import mysqlsh
if not interactive and command is None:
raise ValueError("No command given.")
instance = get_instance(
instance_name=instance_name, instance_id=instance_id,
compartment_id=compartment_id, config=config)
if instance is None:
print("Operation cancelled.")
return
public_ip = get_instance_public_ip(
instance_name=instance_name, instance_id=instance.id,
compartment_id=compartment_id, config=config,
private_ip_fallback=True)
if public_ip == "":
print("ERROR: No public IP address found.\n")
return
# If no command was given and interactive is enabled, loop until
# an empty command or exit is provided by the user input
multi_command = command is None and interactive
output = None
with SshConnection(
username="opc", host=public_ip,
private_key_file_path=private_key_file_path,
private_key_passphrase=private_key_passphrase) as conn:
if multi_command:
print(f"\nConnected to 'opc@{public_ip}' via SSH. "
"Type 'exit' to close the connection.")
# Repeat command execution till the give command is empty or exit
while command is None or \
(command != "" and command.lower() != "exit"):
if multi_command:
command = mysqlsh.globals.shell.prompt(
f"{public_ip} opc$ ").strip()
if command == "" or command.lower() == "exit":
return
# Execute the command
output = conn.execute(command)
# If there was an error, print it
last_error = conn.get_last_error()
if last_error != "":
output += f"ERROR: {last_error}"
# If multi_command is not enabled, break after first execution
if not multi_command:
return output
else:
print(output)
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return