in mds_plugin/compute.py [0:0]
def create_instance(**kwargs):
"""Creates a new compute instance
This function will create a new compartment.
Args:
**kwargs: Additional options
Keyword Args:
instance_name (str): The name used for the new compartment.
availability_domain (str): The name of the availability_domain to use
shape (str): The compute shape used for the instance
cpu_count (int): The number of OCPUs
memory_size (int): The amount of memory
subnet_id (str): The OCID of the subnet to use
public_subnet (bool): Whether the subnet should be public or private
operating_system (str): The name of the operating system,
e.g. "Oracle Linux"
operating_system_version (str): The version of the operating system,
e.g. 8
use_latest_image (bool): Whether to use the latest compute image
ssh_public_key_path (str): The path to the public ssh key,
default is ~/.ssh/id_rsa.pub
init_script (str): A string holding the commands to execute at first
instance startup, starting with #!/bin/bash and separated by
linebreaks
init_script_file_path (str): The path to an init script to be
executed at first instance startup. If specified, this file
will be used instead of the script passed in the init_script
parameter
defined_tags (dict): The defined_tags of the dynamic group.
freeform_tags (dict): The freeform_tags of the dynamic group
compartment_id (str): OCID of the parent compartment.
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If true exceptions are raised
return_formatted (bool): If true a human readable string is returned
return_python_object (bool): Used for internal plugin calls
Returns:
The new instance if interactive is set to false
"""
instance_name = kwargs.get("instance_name")
availability_domain = kwargs.get("availability_domain")
shape = kwargs.get("shape")
cpu_count = kwargs.get("cpu_count", 1)
memory_size = kwargs.get("memory_size", 16)
subnet_id = kwargs.get("subnet_id")
public_subnet = kwargs.get("public_subnet")
operating_system = kwargs.get("operating_system")
operating_system_version = kwargs.get("operating_system_version")
use_latest_image = kwargs.get("use_latest_image", False)
ssh_public_key_path = kwargs.get(
"ssh_public_key_path", "~/.ssh/id_rsa.pub")
init_script = kwargs.get("init_script")
init_script_file_path = kwargs.get("init_script_file_path")
defined_tags = kwargs.get("defined_tags")
# Manual conversion from Shell Dict type until this is automatically done
if defined_tags:
defined_tags = dict(defined_tags)
freeform_tags = kwargs.get("freeform_tags")
# Manual conversion from Shell Dict type until this is automatically done
if freeform_tags:
freeform_tags = dict(freeform_tags)
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config and compartment
try:
# Get the active config and compartment
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.core.models
import oci.exceptions
import oci.pagination
import os.path
import pathlib
import base64
from pathlib import Path
import mysqlsh
from mds_plugin import compartment
from mds_plugin import network
try:
if interactive:
print("Creating a new compute instance ...\n")
# Get a name
if instance_name is None and interactive:
instance_name = mysqlsh.globals.shell.prompt(
"Please enter the name for the new instance: ",
{'defaultValue': ''}).strip()
if not instance_name:
raise ValueError(
"No instance name given. Operation cancelled.")
# Get the availability_domain name
availability_domain_obj = compartment.get_availability_domain(
compartment_id=compartment_id,
availability_domain=availability_domain,
random_selection=True,
config=config,
interactive=interactive,
return_python_object=True)
if availability_domain_obj is None:
raise ValueError("No availability domain given. "
"Operation cancelled.")
else:
availability_domain = availability_domain_obj.name
if interactive:
print(f"Using availability domain {availability_domain}.")
# Get list of available shapes
shape_name = get_shape_name(
shape_name=shape, compartment_id=compartment_id,
availability_domain=availability_domain, config=config,
interactive=interactive)
if not shape_name:
print("Operation cancelled.")
return
if interactive:
print(f"Using shape {shape_name}.")
# Get id of compute image
image = get_image(
operating_system=operating_system,
operating_system_version=operating_system_version,
use_latest_image=use_latest_image,
shape=shape_name, compartment_id=compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if image is None:
print("Operation cancelled.")
return
image_id = image.id
if interactive:
print(f"Using image {image.display_name}.")
# Convert Unix path to Windows
ssh_public_key_path = os.path.abspath(
os.path.expanduser(ssh_public_key_path))
# Check if there is a key available
if os.path.exists(ssh_public_key_path):
with open(ssh_public_key_path, mode='r') as file:
public_key = file.read()
else:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
import stat
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# cSpell:ignore PKCS
private_key = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption())
public_key = key.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
# Build ssh_private_key_path from ssh_public_key_path by
# removing extension
ssh_private_key_path = os.path.splitext(ssh_public_key_path)[0]
if ssh_private_key_path == ssh_public_key_path:
ssh_private_key_path = ssh_public_key_path + ".private"
# Create path
key_path = os.path.dirname(
os.path.abspath(ssh_private_key_path))
Path(key_path).mkdir(parents=True, exist_ok=True)
# Write out keys
with open(ssh_private_key_path, mode='wb') as file:
file.write(private_key)
with open(ssh_public_key_path, mode='wb') as file:
file.write(public_key)
# Fix permissions
# cSpell:ignore IRUSR IWUSR
os.chmod(ssh_private_key_path, stat.S_IRUSR | stat.S_IWUSR)
os.chmod(ssh_public_key_path, stat.S_IRUSR | stat.S_IWUSR)
# Encode public_key to string
public_key = public_key.decode("utf-8")
# Set SSH key and creator metadata
instance_metadata = {
'ssh_authorized_keys': public_key,
'creator': 'MySQL Shell MDS Plugin'
}
# Load init_script_file_path if given
if init_script_file_path:
init_script_file_path = os.path.abspath(
os.path.expanduser(init_script_file_path))
if not os.path.exists(init_script_file_path):
print(f"Error: Init script file path '{init_script_file_path}' "
"not found.")
return
instance_metadata['user_data'] = \
oci.util.file_content_as_launch_instance_user_data(
init_script_file_path)
# Set the init_script if given
elif init_script:
instance_metadata['user_data'] = base64.b64encode(
init_script.encode('utf-8')).decode('utf-8')
# Get a public subnet for the instance
if not subnet_id and interactive:
print("Selecting a subnet for the compute instance ...")
subnet = network.get_subnet(
subnet_id=subnet_id, public_subnet=public_subnet,
availability_domain=availability_domain,
compartment_id=compartment_id, config=config)
if subnet is None and public_subnet == True and interactive:
print("\nDo you want to select "
"a network with a private subnet instead?\n\n"
"Please note that access from the internet will "
"not be possible and a \njump host needs to be "
"used to access the resource\n")
prompt = mysqlsh.globals.shell.prompt(
"Select a network with a private subnet [YES/no]: ",
{'defaultValue': 'yes'}).strip().lower()
if prompt == "yes":
subnet = network.get_subnet(
subnet_id=subnet_id, public_subnet=False,
compartment_id=compartment_id, config=config)
if subnet is None:
print("Operation cancelled.")
return
if interactive:
print(f"Using subnet {subnet.display_name}.")
# Setup the instance details
launch_instance_details = oci.core.models.LaunchInstanceDetails(
display_name=instance_name,
compartment_id=compartment_id,
availability_domain=availability_domain,
shape=shape_name,
shape_config=oci.core.models.LaunchInstanceShapeConfigDetails(
ocpus=float(cpu_count),
memory_in_gbs=float(memory_size)
),
metadata=instance_metadata,
source_details=oci.core.models.InstanceSourceViaImageDetails(
image_id=image_id),
create_vnic_details=oci.core.models.CreateVnicDetails(
subnet_id=subnet.id
),
defined_tags=defined_tags,
freeform_tags=freeform_tags,
agent_config=oci.core.models.LaunchInstanceAgentConfigDetails(
plugins_config=[
oci.core.models.InstanceAgentPluginConfigDetails(
desired_state="ENABLED",
name="Bastion"
)
]
)
)
# Initialize the identity client
compute = core.get_oci_compute_client(config=config)
# Create the instance
instance = compute.launch_instance(launch_instance_details).data
if interactive:
print(f"Compute instance {instance_name} is being created.\n"
f"Use mds.ls() to check it's provisioning state.\n")
return core.return_oci_object(
oci_object=instance,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=format_instance_listing)
except oci.exceptions.ServiceError as e:
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")