mds_plugin/util.py (1,169 lines of code) (raw):
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Sub-Module with utilities for the MySQL Database Service"""
from mysqlsh.plugin_manager import plugin_function
from mds_plugin import core, configuration, mysql_database_service
from os import getenv
# cSpell:ignore SQLDB, Popen, bufsize, dryrun
@plugin_function('mds.util.heatWaveLoadData', shell=True, cli=True, web=True)
def mds_heat_wave_load_data(**kwargs):
"""Loads data to a HeatWave Cluster
Args:
**kwargs: Optional parameters
Keyword Args:
schemas (list): The list of schemas
mode (str): The mode to use, "normal"|"dryrun"
output (str): The output mode to use, "normal"|"compact"|"silent"|"help"
disable_unsupported_columns (bool): Whether to disable unsupported columns
optimize_load_parallelism (bool): Whether to optimize parallelism
enable_memory_check (bool): Whether to enable the memory check
sql_mode (str): The sql_mode to use
exclude_list (str): The database object list to exclude
session (object): The database session to use.
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
Returns:
None in interactive mode, the result sets as string otherwise
"""
schemas = kwargs.get("schemas")
mode = kwargs.get("mode", "normal")
output = kwargs.get("output", "normal")
disable_unsupported_columns = kwargs.get(
"disable_unsupported_columns", True)
optimize_load_parallelism = kwargs.get("optimize_load_parallelism", True)
enable_memory_check = kwargs.get("enable_memory_check", True)
sql_mode = kwargs.get("sql_mode", "")
exclude_list = kwargs.get("exclude_list", "")
session = kwargs.get("session")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
try:
if not schemas:
raise ValueError("At least one schema needs to be specified.")
session = core.get_current_session(session)
policy = (
"disable_unsupported_columns" if disable_unsupported_columns else "not_disable_unsupported_columns")
set_load_parallelism = (
"TRUE" if optimize_load_parallelism else "FALSE")
schemasJson = "JSON_ARRAY(" + \
', '.join(f'"{s}"' for s in schemas) + ")"
optionsJson = ("JSON_OBJECT("
f'"mode", "{mode}", '
f'"output", "{output}", '
f'"sql_mode", {sql_mode}, '
f'"policy", "{policy}", '
f'"set_load_parallelism", {set_load_parallelism}, '
f'"auto_enc", JSON_OBJECT("mode", "{"check" if enable_memory_check else "off"}")')
if exclude_list:
optionsJson += f', "exclude_list", JSON_ARRAY({exclude_list})'
optionsJson += ")"
if interactive:
print(f"Loading Data to HeatWave Cluster Using Auto Parallel Load.\n")
sql = f"CALL sys.heatwave_load({schemasJson}, {optionsJson})"
if interactive:
print(f"MySQL > {sql}\n")
res = session.run_sql(sql)
out_str = ""
next_result = True
while next_result:
rows = res.fetch_all()
if len(rows) == 0:
next_result = res.next_result()
continue
if interactive:
print(core.format_result_set(res, rows, addFooter=False))
else:
out_str += out_str + "\n"
next_result = res.next_result()
if not interactive:
return out_str
except Exception as e:
if raise_exceptions:
raise
else:
print(f"Error: {str(e)}")
@plugin_function('mds.util.createComputeInstanceForEndpoint')
def create_compute_instance_for_endpoint(**kwargs):
"""Returns a public compute instance
If the instance does not yet exists in the compartment, create it
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): The name of the compute instance
db_system_name (str): The new name of the DB System.
db_system_id (str): The OCID of the db_system
private_key_file_path (str): The file path to an SSH private key
subnet_id (str): The OCID of the subnet to use
public_ip (bool): If set to false, no public IP will be assigned
shape (str): The name of the shape to use
cpu_count (int): The number of OCPUs
memory_size (int): The amount of memory
dns_a_record_notification (bool): Whether to print a message to setup the DNS A record for this instance
domain_name (str): The domain name of the compute instance
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
return_formatted (bool): If set to true, a list object is returned
return_python_object (bool): Used for internal plugin calls
Returns:
None
"""
instance_name = kwargs.get("instance_name", "MDSJumpHost")
db_system_name = kwargs.get("db_system_name")
db_system_id = kwargs.get("db_system_id")
private_key_file_path = kwargs.get(
"private_key_file_path", "~/.ssh/id_rsa")
public_ip = kwargs.get("public_ip", True)
subnet_id = kwargs.get("subnet_id")
shape = kwargs.get("shape", "VM.Standard.E4.Flex")
cpu_count = kwargs.get("cpu_count", 1)
memory_size = kwargs.get("memory_size", 16)
dns_a_record_notification = kwargs.get("dns_a_record_notification", False)
domain_name = kwargs.get("domain_name")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
return_python_object = kwargs.get("return_python_object", False)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.mysql
from pathlib import Path
import os.path
from mds_plugin import compartment, compute, network
import time
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config,
interactive=interactive, raise_exceptions=True,
return_python_object=True)
if db_system is None:
raise ValueError("DB System not specified or found.")
# Get compartment and public subnet_id from MDS
if not compartment_id:
compartment_id = db_system.compartment_id
if not subnet_id:
mds_subnet = network.get_subnet(
subnet_id=db_system.subnet_id,
config=config, interactive=False)
subnet = network.get_subnet(
network_id=mds_subnet.vcn_id,
public_subnet=public_ip,
config=config,
interactive=False)
if subnet:
subnet_id = subnet.id
if not subnet_id:
if public_ip:
raise ValueError(
'The network used by the MDS instance does not have public subnet.'
'Please add a public subnet first')
else:
raise ValueError('No subnet specified.')
# Try to get the Compute Instance with the given name
mds_jump_host = compute.get_instance(
instance_name=instance_name,
compartment_id=compartment_id,
config=config, interactive=False,
raise_exceptions=True,
return_python_object=True)
# If it already exists, return it
if mds_jump_host:
return mds_jump_host
# if interactive:
# # If there is no MySQL DBSystemProxy instance yet, ask the user
# print(f"In order to perform the requested operation for the MySQL "
# f"DB System\na compute instance named 'MDSJumpHost' "
# f"needs to be created.\n")
# prompt = core.prompt(
# "Do you want to create a new compute instance to be used as "
# "bastion host? [YES/no]: ",
# {'defaultValue': 'yes'}).strip().lower()
# if prompt != "yes":
# print("Operation cancelled.\n")
# return
if interactive:
print(f"Creating Compute Instance '{instance_name}'...")
new_jump_host = compute.create_instance(
instance_name=instance_name,
shape=shape,
cpu_count=cpu_count, memory_size=memory_size,
operating_system="Oracle Linux",
operating_system_version="9",
use_latest_image=True,
subnet_id=subnet_id,
public_subnet=public_ip,
init_script_file_path=os.path.join(
os.path.join(Path(__file__).parent.absolute(), "internal"), "init_router_script.sh"),
interactive=False,
return_python_object=True)
if new_jump_host is None:
print("Compute instance could not be created.")
return
# Initialize the identity client
compute_client = core.get_oci_compute_client(config=config)
print(f"Waiting for Compute Instance '{instance_name}' to become "
"available.\nThis can take up to 5 minutes or more.", end="")
# Wait until the lifecycle_state == RUNNING, 5 minutes max
try:
cycles = 0
while cycles < 60:
mds_jump_host = compute_client.get_instance(
new_jump_host.id).data
if mds_jump_host.lifecycle_state == "RUNNING":
break
else:
time.sleep(5)
print(".", end="")
cycles += 1
print("")
except oci.exceptions.ServiceError as e:
print(f'Could not fetch the compute instances state.\n'
f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
if mds_jump_host.lifecycle_state != "RUNNING":
print(f"Compute Instance '{instance_name}' did not become available "
f"within 5 minutes. Please check the state manually.")
return None if interactive else mds_jump_host
if interactive:
print(f"Compute Instance '{instance_name}' became available.")
# Get the public IP of the instance
public_ip = compute.get_instance_public_ip(
instance_id=mds_jump_host.id, compartment_id=compartment_id,
config=config, private_ip_fallback=True)
if public_ip is None or public_ip == "":
raise Exception(
f"The public IP of the {instance_name} instance could not be "
"fetched.")
if dns_a_record_notification and interactive:
print("\nATTENTION: Please create a DNS A record using the following values.\n"
f"Domain: {domain_name}\n"
f"Destination TCP/IP address: {public_ip}")
answered = False
while not answered:
try:
result = core.prompt(
f"Please click OK once the DNS A record has been created. [OK/Cancel]: ",
{"defaultValue": "OK"})
answered = True
except:
print(
"Please select OK or Cancel on the confirmation notification.")
pass
if result != "OK":
raise Exception(
"Endpoint creation cancelled. Please delete the compute instance that has been created.")
if interactive:
print("\nPerforming base configuration.\n"
f"Connecting to {instance_name} instance at {public_ip}...",
end="")
setup_complete = False
connected = False
cycles = 0
while not setup_complete and cycles < 10:
cycles += 1
try:
with compute.SshConnection(
username="opc", host=public_ip,
private_key_file_path=private_key_file_path) as conn:
connected = True
if interactive:
print(f"\nConnected to {instance_name} instance at "
f"{public_ip}.")
# Get MySQL Router configuration from remote instance
output = ""
output = conn.execute('mysqlsh --js -e "mds.info()"').strip()
if "MySQL Shell MDS Plugin" not in output:
# If the config is not available yet, give the instance
# time to complete setup
if interactive:
print(f"Waiting for {instance_name} setup to be "
f"completed.\nThis can take up to 2 minutes.",
end="")
try:
i = 0
while ("MySQL Shell MDS Plugin" not in output
and i < 25):
output = conn.execute(
'mysqlsh --js -e "mds.info()"').strip()
if "MySQL Shell MDS Plugin" not in output:
time.sleep(5)
if interactive:
print(".", end="")
i += 1
except:
pass
if interactive:
print("")
if "MySQL Shell MDS Plugin" not in output:
raise Exception(
f"\nCould not finish the '{instance_name}' setup "
f"at {public_ip}.\n")
else:
setup_complete = True
except Exception as e:
if cycles < 10 and not connected:
time.sleep(5)
if interactive:
print(".", end="")
else:
raise Exception(
f"Could not connect to compute instance "
f"'{instance_name}' at {public_ip}.\n"
f"ERROR: {str(e)}")
if interactive:
print(f"Compute Instance '{instance_name}' successfully created.")
return core.return_oci_object(
oci_object=new_jump_host,
return_formatted=return_formatted,
return_python_object=return_python_object,
format_function=compute.format_instance_listing)
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
@plugin_function('mds.util.createEndpoint', shell=True, cli=True, web=True)
def add_public_endpoint(**kwargs):
"""Creates a public endpoint using MySQL Router on a compute instance
If no id is given, it will prompt the user for the id.
Args:
**kwargs: Optional parameters
Keyword Args:
instance_name (str): Name of the compute instance
db_system_name (str): The new name of the DB System.
db_system_id (str): The OCID of the db_system
private_key_file_path (str): The file path to an SSH private key
shape (str): The name of the shape to use
cpu_count (int): The number of OCPUs
memory_size (int): The amount of memory
mysql_user_name (str): The MySQL user name to use for bootstrapping
public_ip (bool): If set to true, a public IP will be assigned to the compute instance
domain_name (str): The domain name of the compute instance
port_forwarding (bool): Whether port forwarding of MySQL ports should be enabled
mrs (bool): Whether the MySQL REST Service (MRS) should be enabled
ssl_cert (bool): Whether SSL Certificates should be managed
jwt_secret (str): The JWT secret for MRS
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
config_profile (str): The name of an OCI config profile
interactive (bool): Indicates whether to execute in interactive mode
raise_exceptions (bool): If set to true exceptions are raised
return_formatted (bool): If set to true, a list object is returned
Returns:
None
"""
# cSpell:ignore OCPU
instance_name = kwargs.get("instance_name")
db_system_name = kwargs.get("db_system_name")
db_system_id = kwargs.get("db_system_id")
private_key_file_path = kwargs.get(
"private_key_file_path", "~/.ssh/id_rsa")
shape = kwargs.get("shape", "VM.Standard.E4.Flex")
cpu_count = kwargs.get("cpu_count", 1)
memory_size = kwargs.get("memory_size", 16)
mysql_user_name = kwargs.get("mysql_user_name", "dba")
mysql_user_password = core.prompt(
f"Please enter the password for {mysql_user_name}",
{"type": "password"})
public_ip = kwargs.get("public_ip", True)
domain_name = kwargs.get("domain_name")
port_forwarding = kwargs.get("port_forwarding", True)
mrs = kwargs.get("mrs", True)
ssl_cert = kwargs.get("ssl_cert", False)
jwt_secret = kwargs.get("jwt_secret")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
config_profile = kwargs.get("config_profile")
interactive = kwargs.get("interactive", core.get_interactive_default())
raise_exceptions = kwargs.get("raise_exceptions", not interactive)
return_formatted = kwargs.get("return_formatted", interactive)
# Get the active config and compartment
try:
config = configuration.get_current_config(
config=config, config_profile=config_profile,
interactive=interactive)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
db_system_id = configuration.get_current_db_system_id(
db_system_id=db_system_id, config=config)
from mds_plugin import compute
import configparser
import io
import time
import hashlib
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if db_system is None:
raise ValueError("No DB System selected."
"Cancelling operation")
if not jwt_secret:
md5 = hashlib.md5()
md5.update(db_system.id.encode())
jwt_secret = md5.hexdigest()
# Get the first active endpoint
endpoints = [e for e in db_system.endpoints if e.status == 'ACTIVE']
if len(endpoints) < 1:
raise Exception(
"This MySQL DB System has no active endpoints assigned.")
endpoint = endpoints[0]
# Get the compute instance (let the user create it
# if it does not exist yet)
jump_host = create_compute_instance_for_endpoint(
instance_name=instance_name,
private_key_file_path=private_key_file_path,
db_system_id=db_system_id,
shape=shape, memory_size=memory_size,
cpu_count=cpu_count,
dns_a_record_notification=ssl_cert,
domain_name=domain_name,
compartment_id=db_system.compartment_id, config=config,
interactive=interactive,
return_python_object=True)
if not jump_host:
raise Exception(f"Compute instance {instance_name} not available."
"Operation cancelled.")
# Get the public IP of the instance
public_ip = compute.get_instance_public_ip(
instance_id=jump_host.id, compartment_id=db_system.compartment_id,
config=config)
if not public_ip:
raise Exception(f"The public IP of the instance {instance_name} "
"could not be fetched.")
sec_lists = compute.get_instance_vcn_security_lists(
instance_id=jump_host.id,
compartment_id=db_system.compartment_id,
config=config, interactive=interactive,
raise_exceptions=raise_exceptions,
return_python_object=True)
if sec_lists is None:
raise Exception(
"The network security lists could not be fetched.")
# Allow traffic on port 80 in order to fetch the certs
if mrs and public_ip:
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=80,
description="MRS HTTP via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
# Open an SSH connection to the instance
if interactive:
print("\nBootstrapping the MySQL Router.\n"
f"Connecting to {instance_name} instance at {public_ip}...")
try:
with compute.SshConnection(
username="opc", host=public_ip,
private_key_file_path=private_key_file_path) as conn:
if interactive:
print(f"Connected to {instance_name} instance at "
f"{public_ip}.")
# Get MySQL Router configuration from remote instance
output = ""
output = conn.execute(
'test -f /etc/mysqlrouter/'
'mysqlrouter.conf && echo "Available"').strip()
if output != "Available":
# If the config is not available yet, give the instance time
# to complete setup
if interactive:
print(
f"Waiting for MySQL Router Configuration to become "
f"available.\nThis can take up to 2 minutes.",
end="")
try:
i = 0
while output != "Available" and i < 25:
output = conn.execute(
'test -f /etc/mysqlrouter/'
'mysqlrouter.conf && echo "Available"').strip()
if output != "Available":
time.sleep(5)
if interactive:
print(".", end="")
i += 1
except:
pass
if output == "":
raise Exception(
"Could not fetch MySQL Router configuration from remote instance.")
if interactive:
print("Bootstrapping MySQL Router against "
f"{mysql_user_name}@{endpoint.ip_address}:{endpoint.port} "
f"using JWT secret {jwt_secret} ...")
(success, output) = conn.executeAndSendOnStdin(
f"sudo mysqlrouter_bootstrap {mysql_user_name}@{endpoint.ip_address}:{endpoint.port} "
f"-u mysqlrouter "
f"--mrs --mrs-global-secret {jwt_secret} " if mrs else ""
"--https-port 8446 "
"--conf-set-option=http_server.ssl=0 "
"--conf-set-option=http_server.port=8446 ",
mysql_user_password)
if not success:
if output:
print(output)
raise Exception("Bootstrap operation failed.")
# Manually fix the MySQL Router config till bootstrap allows to disable SSL
# Load config to in-memory stream
conn.execute(
"sudo cp /etc/mysqlrouter/mysqlrouter.conf /home/opc/mysqlrouter.conf")
conn.execute("sudo chown opc:opc /home/opc/mysqlrouter.conf")
with io.BytesIO() as router_config_stream:
# Get the remote config file
try:
conn.get_remote_file_as_file_object(
"/home/opc/mysqlrouter.conf",
router_config_stream)
except Exception as e:
raise Exception("Could not get router config file. "
f"{str(e)}")
# If there was an error, print it
last_error = conn.get_last_error()
if last_error != "":
raise Exception(f"Could not read router config file. "
f"{last_error}")
# Load CLI config file
router_config = configparser.ConfigParser()
router_config.read_string(
router_config_stream.getvalue().decode("utf-8"))
router_config_stream.close()
# # # Ensure that there is a section with the name of
# # # "routing:classic"
# # if "routing:classic" not in router_config.sections():
# # router_config["routing:classic"] = {}
# # cnf = router_config["routing:classic"]
# # cnf["routing_strategy"] = "round-robin"
# # cnf["bind_address"] = "0.0.0.0"
# # cnf["bind_port"] = "6446"
# # cnf["destinations"] = f"{endpoint.ip_address}:{endpoint.port}"
# # Ensure that there is a section with the name of "http_server"
# if "http_server" not in router_config.sections():
# router_config["http_server"] = {}
cnf = router_config["http_server"]
cnf["port"] = "8446"
cnf["ssl"] = "0"
# cnf["ssl_cert"] = ""
# cnf["ssl_key"] = ""
# cnf["static_folder"] = "/var/run/mysqlrouter/www/"
# # # Ensure that there is a section with the name of "routing:x"
# # if "routing:x" not in router_config.sections():
# # router_config["routing:x"] = {}
# # cnf = router_config["routing:x"]
# # cnf["routing_strategy"] = "round-robin"
# # cnf["bind_address"] = "0.0.0.0"
# # cnf["bind_port"] = "6447"
# # cnf["destinations"] = f"{endpoint.ip_address}:{endpoint.port_x}"
# # # cSpell:ignore mrds SQLR
# # # Ensure that there is a section with the name of
# # # "mysql_rest_service"
# # if "mysql_rest_service" not in router_config.sections():
# # router_config["mysql_rest_service"] = {}
# # cnf = router_config["mysql_rest_service"]
# # cnf["mysql_user"] = "dba"
# # cnf["mysql_password"] = "MySQLR0cks!"
# # cnf["mysql_read_only_route"] = "classic"
# # cnf["mysql_read_write_route"] = "classic"
if interactive:
print("Writing updated MySQL Router configuration file...")
# Write config to in-memory stream
with io.BytesIO() as router_config_bytes_stream:
with io.StringIO() as router_config_stream:
router_config.write(router_config_stream)
# Seek to the beginning of the text stream
router_config_bytes_stream.write(
router_config_stream.getvalue().encode("utf-8"))
router_config_bytes_stream.seek(0)
# Write out new config file to remote instance
try:
conn.put_local_file_object(
router_config_bytes_stream,
"/home/opc/mysqlrouter.conf")
except Exception as e:
raise Exception(
"Could not upload router config file. "
f"{str(e)}")
# Move config to final place and fix privileges
conn.execute(
"sudo cp /home/opc/mysqlrouter.conf /etc/mysqlrouter/mysqlrouter.conf")
conn.execute(
"sudo chown mysqlrouter:mysqlrouter /etc/mysqlrouter/mysqlrouter.conf")
conn.execute("sudo rm /home/opc/mysqlrouter.conf")
# # If there was an error, print it
# last_error = conn.get_last_error()
# if last_error != "":
# raise Exception(
# "Could not upload router config file. "
# f"ERROR: {last_error}")
# Install the SSL certificate
if ssl_cert:
# Restart NGINX
conn.execute("sudo systemctl restart nginx.service")
ssl_cert_created = False
while not ssl_cert_created and domain_name != "":
# Check for one minute if the domain name points to the instance's ip address
if interactive:
print(
f"\nCreating the SSL certificate for {domain_name} ...")
try:
i = 0
while not ssl_cert_created and i < 5:
# cSpell:ignore certbot certonly webroot
# conn.execute(
# f"sudo certbot certonly --webroot -w /usr/share/nginx/html -d {domain_name} --agree-tos "
# "--register-unsafely-without-email --key-type rsa")
output = conn.execute(f"sudo /home/opc/.acme.sh/acme.sh --issue -d {domain_name} "
"--webroot /usr/share/nginx/html "
"--force --server letsencrypt --home /home/opc/.acme.sh")
last_error = conn.get_last_error()
if last_error != "":
if i == 0 and interactive:
print("\nATTENTION: Please create a DNS A record using the following values.\n"
f"Domain: {domain_name}\n"
f"Destination TCP/IP address: {public_ip}")
print(
f"\nWaiting for DNS A record creation ...", end="")
time.sleep(10)
if interactive:
print(".", end="")
else:
ssl_cert_created = True
i += 1
except:
pass
if not ssl_cert_created:
if interactive:
print(f"Failed to create the SSL certificate for {domain_name}. {output} {last_error}\n"
"Please correct the domain name or leave empty to cancel.")
try:
domain_name = core.prompt(
f"Domain Name for {public_ip}: ")
if not domain_name:
if interactive:
print(
"Skipping SSL certificate generation.")
domain_name = ""
continue
except:
domain_name = ""
continue
if ssl_cert_created:
# Install Certificate
conn.execute("sudo mkdir -p /etc/pki/nginx/private")
# cSpell:ignore reloadcmd
output = conn.execute(f"sudo /home/opc/.acme.sh/acme.sh --install-cert -d {domain_name} "
"--key-file /etc/pki/nginx/private/key.pem "
"--fullchain-file /etc/pki/nginx/cert.pem "
'--reloadcmd "sudo systemctl restart nginx.service" '
"--force --home /home/opc/.acme.sh")
last_error = conn.get_last_error()
if last_error != "":
raise Exception(
f"Failed to install SSL certificate. {output} {last_error}")
if interactive:
print("Writing web server configuration...")
# cSpell:ignore letsencrypt fullchain privkey
if domain_name != "":
ssl_cert_path = "/etc/pki/nginx/cert.pem"
ssl_cert_key_path = "/etc/pki/nginx/private/key.pem"
# ssl_cert_path = f"/etc/letsencrypt/live/{domain_name}/fullchain.pem"
# ssl_cert_key_path = f"/etc/letsencrypt/live/{domain_name}/privkey.pem"
else:
ssl_cert_path = "/var/lib/mysqlrouter/router-cert.pem"
ssl_cert_key_path = "/var/lib/mysqlrouter/router-key.pem"
nginx_config = f"""server {{
listen 443 ssl http2;
{f"server_name {domain_name};" if domain_name != "" else ""}
ssl_certificate {ssl_cert_path};
ssl_certificate_key {ssl_cert_key_path};
# Allow large attachments
client_max_body_size 128M;
location / {{
proxy_pass http://127.0.0.1:8446;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}}
}}
"""
# Write config to in-memory stream
with io.BytesIO() as nginx_config_bytes_stream:
with io.StringIO() as nginx_config_stream:
nginx_config_stream.write(nginx_config)
# Seek to the beginning of the text stream
nginx_config_bytes_stream.write(
nginx_config_stream.getvalue().encode("utf-8"))
nginx_config_bytes_stream.seek(0)
# Write out new config file to remote instance
try:
conn.put_local_file_object(
nginx_config_bytes_stream,
"/home/opc/mrs.nginx.conf")
except Exception as e:
raise Exception(
"Could not upload router config file. "
f"{str(e)}")
# Move config to final place and fix privileges
conn.execute(
"sudo cp /home/opc/mrs.nginx.conf /etc/nginx/conf.d/mrs.nginx.conf")
conn.execute(
"sudo chown root:root /etc/nginx/conf.d/mrs.nginx.conf")
conn.execute("sudo rm /home/opc/mrs.nginx.conf")
# If there was an error, print it
last_error = conn.get_last_error()
if last_error != "":
raise Exception(
"Could not upload web server config file. "
f"ERROR: {last_error}")
if interactive:
print("Opening Firewall ports...")
# Open MySQL Router ports on the firewall
if port_forwarding:
conn.execute(
"sudo firewall-cmd --zone=public --permanent --add-port=6446-6449/tcp")
# If mrs was requested but no public_ip, open the port for a Load Balancer
if mrs and not public_ip:
conn.execute(
"sudo firewall-cmd --zone=public --permanent --add-port=8446/tcp")
conn.execute("sudo firewall-cmd --reload")
# If mrs was requested but no public_ip, open the port for a Load Balancer
# cSpell:ignore semanage mysqld
if mrs and not public_ip:
conn.execute(
"sudo semanage port -a -t mysqld_port_t -p tcp 8446")
if interactive:
print("Restarting MySQL Router...")
# Restart mysqlrouter.service
conn.execute("sudo systemctl restart mysqlrouter.service")
if interactive:
print("Restarting web server...")
# Restart NGINX
conn.execute("sudo systemctl restart nginx.service")
# Add ingress rules for MySQL ports to security list
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=6446,
description="Classic MySQL Protocol RW via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=6447,
description="Classic MySQL Protocol RO via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=6448,
description="MySQL X Protocol RW via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=6449,
description="MySQL X Protocol RO via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
if mrs and not public_ip:
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=8446,
description="MRS HTTP via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
elif mrs:
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=443,
description="MRS HTTPS via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
compute.add_ingress_port_to_security_lists(
security_lists=sec_lists, port=80,
description="MRS HTTP via Router",
compartment_id=compartment_id, config=config,
interactive=interactive,
raise_exceptions=raise_exceptions)
if interactive:
endpoint_address = domain_name if domain_name != "" else public_ip
print("\nNew endpoint successfully created.\n")
if port_forwarding:
print(f" Classic MySQL Protocol: {endpoint_address}:6446\n"
f" MySQL X Protocol: {endpoint_address}:6448\n")
if mrs:
print(
f" MySQL REST Service HTTPS: https://{endpoint_address}/\n")
if port_forwarding:
print(
f"Example:\n mysqlsh mysql://{mysql_user_name}@{endpoint_address}:6446")
if not return_formatted:
return {
"ip": public_ip,
"domainName": domain_name,
"port": 6446,
"port_x": 6447,
"rest_http": 443
}
except Exception as e:
raise Exception(
f"Could not configure the compute instance '{instance_name}' "
f"at {public_ip}.\n{str(e)}")
except Exception as e:
if raise_exceptions:
raise
print(f"ERROR: {str(e)}")
def create_bucket_import_pars(object_name_prefix, bucket_name, db_system_name,
compartment_id, config):
"""Creates the PARs needed for an import from a bucket
Args:
object_name_prefix (str): The prefix used for the object names
bucket_name (str): The name of the bucket
db_system_name (str): The name of the MySQL DB System
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
"""
from mds_plugin import object_store
par = object_store.create_bucket_object_par(
bucket_object_name=f'{object_name_prefix}@.manifest.json',
bucket_name=bucket_name,
access_type="r",
compartment_id=compartment_id,
config=config,
return_object=True)
if par is None:
print(f"Could not create PAR for manifest file. Operation cancelled.\n")
progress_par = object_store.create_bucket_object_par(
bucket_object_name=f'{object_name_prefix}@.{db_system_name}.progress',
bucket_name=bucket_name,
access_type="rw",
compartment_id=compartment_id,
config=config,
return_object=True)
if progress_par is None:
print(f"Could not create PAR for progress file. Operation cancelled.\n")
return par, progress_par
@plugin_function('mds.util.importDumpFromBucket')
def import_from_bucket(**kwargs):
"""Imports a dump from a bucket on a DB System
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket
object_name_prefix (str): The prefix used for the object names
db_system_name (str): The name of the MySQL DB System
db_system_id (str): The OCID of the db_system
db_system_ip (str): The IP to use for the import, overwriting the
one from the db_system if given
db_system_port (str): The Port to use for the import, overwriting
the one from the db_system if given
admin_username (str): The name of the administrator user account
admin_password (str): The password of the administrator account
private_key_file_path (str): The file path to an SSH private key
perform_cleanup (bool): Whether the PARs and bucket should be deleted
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
interactive (bool): Whether user input is considered
Returns:
None
"""
bucket_name = kwargs.get('bucket_name')
db_system_name = kwargs.get('db_system_name')
private_key_file_path = kwargs.get(
"private_key_file_path", "~/.ssh/id_rsa")
object_name_prefix = kwargs.get("object_name_prefix", "")
admin_username = kwargs.get('admin_username')
admin_password = kwargs.get('admin_password')
db_system_id = kwargs.get("db_system_id")
db_system_ip = kwargs.get("db_system_ip")
db_system_port = kwargs.get("db_system_port")
perform_cleanup = kwargs.get("perform_cleanup")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", True)
# Get the active config, compartment and db_system
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
db_system_id = configuration.get_current_db_system_id(
db_system_id=db_system_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
except ValueError as e:
if not interactive:
raise
print(f"ERROR: {str(e)}")
return
from mds_plugin import compute, user, object_store
import datetime
import time
import base64
import json
import mysqlsh
# If no explicit IP is given, get DB System
if not db_system_ip:
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config,
interactive=interactive)
if db_system is None:
print("No MySQL DB System given. Operation cancelled.\n")
return
# Get the first active endpoint
endpoints = [e for e in db_system.endpoints if e.status == 'ACTIVE']
if len(endpoints) < 1:
print("ERROR: This MySQL DB System has no active endpoints "
"assigned.")
return
endpoint = endpoints[0]
db_system_name = db_system.display_name
db_system_ip = endpoint.ip_address
db_system_port = endpoint.port
if not db_system_port:
db_system_port = '3306'
if not db_system_name:
db_system_name = 'DbSystem'
# Get ObjectStorage namespace
os_namespace = object_store.get_object_store_namespace(config=config)
# Get bucket
bucket = object_store.get_bucket(
bucket_name=bucket_name,
compartment_id=compartment_id,
config=config,
ignore_current=False,
interactive=interactive)
if bucket is None:
print("No bucket given. Operation cancelled.\n")
return
# Check if manifest is available on this bucket
obj = object_store.get_bucket_object(
name=object_name_prefix + '@.manifest.json',
bucket_name=bucket.name,
compartment_id=None,
config=None,
no_error_on_not_found=False)
if obj is None:
print(f"Manifest file '{object_name_prefix}@.manifest.json' not "
f"found on bucket '{bucket.name}'. Operation cancelled.\n")
return
# Get an admin_username
if interactive and not admin_username:
admin_username = mysqlsh.globals.shell.prompt(
"MySQL Administrator account name [admin]: ",
{'defaultValue': 'admin'}).strip()
if not admin_username:
print("Operation cancelled.")
return
# Get an admin_password
if interactive and admin_password is None:
admin_password = mysqlsh.globals.shell.prompt(
"MySQL Administrator account password: ",
{'defaultValue': '', 'type': 'password'}).strip()
if not admin_password:
print("Operation cancelled.")
return
# Create PAR for manifest and progress files
par, progress_par = create_bucket_import_pars(
object_name_prefix=object_name_prefix,
bucket_name=bucket.name,
db_system_name=db_system_name,
compartment_id=compartment_id,
config=config)
if par is None or progress_par is None:
print("Could not create pre-authenticated requests. "
"Operation cancelled.")
return
# Build URLs
par_url_prefix = object_store.get_par_url_prefix(config=config)
par_url = par_url_prefix + par.access_uri
progress_par_url = par_url_prefix + progress_par.access_uri
try:
# Get the 'MySQLDBBastionHost' compute instance (let the user create it
# if it does not exist yet)
mds_proxy = create_compute_instance_for_endpoint(
private_key_file_path=private_key_file_path,
compartment_id=compartment_id, config=config,
interactive=False)
if mds_proxy is None:
print("Could not get a compute instance. Operation cancelled.")
return
# Get the public IP of the instance
public_ip = compute.get_instance_public_ip(
instance_id=mds_proxy.id, compartment_id=compartment_id,
config=config, private_ip_fallback=True)
if public_ip is None or public_ip == "":
print("The public IP of the MySQLDBBastionHost instance could not "
"be fetched.")
return
# Open an SSH connection to the instance
print(f"Connecting to MySQLDBBastionHost instance at {public_ip}...")
try:
success = None
with compute.SshConnection(
username="opc", host=public_ip,
private_key_file_path=private_key_file_path) as conn:
print(f"Connected to MySQLDBBastionHost instance at "
f"{public_ip}.\n"
f"Starting import...")
# Build command string to execute
cmd = ['mysqlsh',
(f'{admin_username}@'
f'{endpoint.ip_address}:{endpoint.port}'),
'--passwords-from-stdin',
'--save-passwords=never',
'--',
'util',
'load-dump',
f'{par_url}',
f'--osBucketName={bucket.name}',
f'--osNamespace={os_namespace}',
f'--progress-file={progress_par_url}',
'--loadUsers=true',
'--showProgress=true',
'--ignoreVersion=true']
cmd = " ".join(cmd)
# Open channel
chan = conn.client.get_transport().open_session()
try:
chan.settimeout(timeout=None)
chan.set_combine_stderr(combine=True)
# Execute shell and call import function
chan.exec_command(cmd)
# Send password to stdin
chan.sendall(f"{admin_password}\n".encode('utf-8'))
chan.shutdown_write()
def print_lines(buffer, no_line_break=False):
for line in buffer:
if "Please provide the password for" \
in line:
line = line.replace(
"Please provide the password for",
"Importing dump on")
if 'Worker' in line:
line += ''
if line.strip() != "":
if no_line_break:
print(line, end='\r')
else:
print(f'>> {line}')
# While the command didn't return an exit code yet
read_buffer_size = 1024
buffer = ""
while not chan.exit_status_ready():
# Update every 0.1 seconds
time.sleep(0.1)
if chan.recv_ready():
buffer += chan.recv(
read_buffer_size).decode('utf-8')
# Check if a line break was in the buffer
if "\n" in buffer:
end = buffer.rfind("\n")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(ready.split("\n"))
elif "\r" in buffer:
end = buffer.rfind("\r")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(
ready.split("\r"), no_line_break=True)
# Ensure we gobble up all remaining data
while True:
try:
output = chan.recv(
read_buffer_size).decode('utf-8')
if not output and not chan.recv_ready():
break
else:
buffer += output
# Check if a line break was in the buffer
if "\n" in buffer:
end = buffer.rfind("\n")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(ready.split("\n"))
except Exception:
continue
# If there is still something in the buffer, print it
if buffer:
print(buffer)
success = chan.recv_exit_status() == 0
finally:
chan.close()
if success is not None:
print(f"Connection to MySQLDBBastionHost instance closed.\n"
f"Import completed "
f"{'successfully' if success else 'with errors'}.")
except Exception as e:
print("Could not execute the import on compute instance "
f"'MySQLDBBastionHost' at {public_ip}.\nERROR: {str(e)}")
finally:
# Delete the PARs created especially for this import
object_store.delete_par(
bucket_name=bucket.name,
par_id=par.id,
compartment_id=compartment_id,
config=config,
interactive=False)
object_store.delete_par(
bucket_name=bucket.name,
par_id=progress_par.id,
compartment_id=compartment_id,
config=config,
interactive=False)
if perform_cleanup:
print("Performing cleanup...")
if object_name_prefix:
# Delete all PARs created by dump
object_store.delete_par(
bucket_name=bucket.name,
name=f'shell-dump-{object_name_prefix}*',
compartment_id=compartment_id,
config=config,
interactive=False)
# Delete all bucket objects
object_store.delete_bucket_object(
name=f"{object_name_prefix}*",
bucket_name=bucket.name,
compartment_id=compartment_id,
config=config,
interactive=False)
else:
# If no object_name_prefix was used, also delete the bucket
object_store.delete_bucket(
bucket_name=bucket.name,
compartment_id=compartment_id,
config=config,
interactive=False)
print("Operation completed.")
@plugin_function('mds.util.importDumpFromLocalDir')
def import_from_local_dir(local_dump_dir=None, db_system_name=None,
options=None):
"""Imports a local dump on a DB System
Args:
local_dump_dir (str): The directory that holds the local dump
db_system_name (str): The new name of the DB System.
options (dict): A dict with various options
object_name_prefix (str): The prefix used for the object names
db_system_id (str): The OCID of the db_system
admin_username (str): The name of the administrator user account
admin_password (str): The password of the administrator user account
private_key_file_path (str): The file path to an SSH private key
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
interactive (bool): Whether user input is considered
Returns:
None
"""
if options is None:
options = {}
object_name_prefix = options.get("object_name_prefix")
db_system_id = options.get("db_system_id")
admin_username = options.get("admin_username")
admin_password = options.get("admin_password")
private_key_file_path = options.get(
"private_key_file_path", "~/.ssh/id_rsa")
compartment_id = options.get("compartment_id")
config = options.get("config")
interactive = options.get("interactive", True)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.mysql
from pathlib import Path
from os import listdir
import os.path
import datetime
import mysqlsh
import object_store
if interactive:
print("Preparing for data import from a local directory...\n")
if interactive and local_dump_dir is None:
local_dump_dir = mysqlsh.globals.shell.prompt(
"Please specify the directory path that contains the dump: ",
{'defaultValue': ''}).strip()
if local_dump_dir == "":
print("Operation cancelled.")
return
elif local_dump_dir is None:
print("No directory path given. Operation cancelled.")
return
# Get an admin_username
if interactive and admin_username is None:
admin_username = mysqlsh.globals.shell.prompt(
"MySQL Administrator account name [admin]: ",
{'defaultValue': 'admin'}).strip()
if admin_username == "":
print("Operation cancelled.")
return
elif admin_username is None:
admin_username = "admin"
# Get an admin_password
if interactive and admin_password is None:
admin_password = mysqlsh.globals.shell.prompt(
"MySQL Administrator account password: ",
{'defaultValue': '', 'type': 'password'}).strip()
if admin_password == "":
print("Operation cancelled.")
return
elif admin_password is None:
raise ValueError("The argument admin_password must be set.")
# Get DB System
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config)
if db_system is None:
print("Operation cancelled.\n")
return
options["db_system_id"] = db_system.id
# Make sure there is a 'MySQLDBBastionHost' compute instance (let the user
# create it if it does not exist yet)
mds_proxy = create_compute_instance_for_endpoint(
private_key_file_path=private_key_file_path,
compartment_id=db_system.compartment_id, config=config,
interactive=False)
if mds_proxy is None:
print("Operation cancelled.")
return
# Take all alphanumeric chars from the DB System display_name
# to create the bucket_name
bucket_name = (
f"{''.join(e for e in db_system.display_name if e.isalnum())}"
f"_import_{datetime.datetime.now():%Y%m%d%H%M%S}")
print(f"\nCreating bucket {bucket_name}...")
bucket = object_store.create_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, return_object=True)
if bucket is None:
print("Cancelling operation")
return
# Upload the files from the given directory to the bucket
file_count = object_store.create_bucket_objects_from_local_dir(
local_dir_path=local_dump_dir, bucket_name=bucket.name,
object_name_prefix=object_name_prefix,
compartment_id=compartment_id, config=config, interactive=False)
if file_count is None:
print("Cancelling operation")
return
# Start the import from the bucket
import_from_bucket(
bucket_name=bucket.name,
options=options)
# if not object_store.delete_bucket_object(
# bucket_object_name="*", bucket_name=bucket.name,
# compartment_id=compartment_id, config=config,
# interactive=False):
# print("Could not delete files from buckets.")
if not object_store.delete_bucket(
bucket_name=bucket.name,
compartment_id=compartment_id, config=config,
interactive=False):
print("Could not delete the bucket.")
if interactive:
print("Operation completed.")
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
@plugin_function('mds.util.dumpToBucket')
def dump_to_bucket(**kwargs):
"""Imports a dump on a DB System using a PAR
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket
create_bucket_if_not_exists (bool): Will create the bucket if set
object_name_prefix (str): The prefix used for the object names
connection_uri (str): The URI to the MySQL Server
connection_password (str): The password of the MySQL
user account
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
config_file_path (str): The path to the config file.
interactive (bool): Enables interactive input.
return_true_on_success (bool): Whether to return true on success
Returns:
None or true on success if return_true_on_success is set to true
"""
# Handle kwargs
bucket_name = kwargs.get("bucket_name")
create_bucket_if_not_exists = kwargs.get(
"create_bucket_if_not_exists", False)
object_name_prefix = kwargs.get("object_name_prefix")
connection_uri = kwargs.get("connection_uri")
connection_password = kwargs.get("connection_password")
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
config_file_path = kwargs.get(
"config_file_path", getenv("MYSQLSH_OCI_CONFIG_FILE"))
if config_file_path is None:
config_file_path = "~/.oci/config"
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", True)
return_true_on_success = kwargs.get("return_true_on_success")
# Get the active config, compartment and db_system
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
from mds_plugin import object_store
import os.path
import json
import base64
import subprocess
from pathlib import Path
import time
import mysqlsh
import oci.exceptions
# If no connection_uri was given, try to use the one from the current
# connection if this one uses the classic protocol
if connection_uri is None:
session = mysqlsh.globals.shell.get_session()
if session is not None:
if "mysql://" in session.get_uri():
connection_uri = session.get_uri()
# If no connection_uri was given but interactive is True, ask the user
if connection_uri is None and interactive:
print("Please enter the MySQL Server connection URI for the MySQL "
"Server serving as data source.\nTo get more information about "
"the URI format type '\\? connection' in the MySQL Shell.\n"
"Example: admin@localhost:3306\n")
connection_uri = mysqlsh.globals.shell.prompt(
"MySQL Server connection URI: ")
if connection_uri == "":
print("Operation Cancelled.\n")
return
# Get an admin_password
if connection_password is None:
connection_password = mysqlsh.globals.shell.prompt(
f"Password for {connection_uri}: ",
{'defaultValue': '', 'type': 'password'}).strip()
if connection_password == "":
print("Operation cancelled.")
return
# Get object_name_prefix
if interactive and object_name_prefix is None:
print("\nIf needed, a prefix for the dumped object names can be set.\n"
"Default is to use no prefix.")
object_name_prefix = mysqlsh.globals.shell.prompt(
"\nPlease enter the object name prefix []: ",
{'defaultValue': ''}).strip()
elif object_name_prefix is None:
object_name_prefix = ""
# Get bucket or create one
bucket_created = False
try:
bucket = object_store.get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, interactive=False)
except oci.exceptions.ServiceError as e:
if create_bucket_if_not_exists:
bucket = object_store.create_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, interactive=False, return_object=True)
else:
if not interactive:
raise
print(f"The bucket {bucket_name} does not exist.")
return
os_namespace = object_store.get_object_store_namespace(config)
# Convert Unix path to Windows
config_file_path = os.path.abspath(
os.path.expanduser(config_file_path))
# Get the current profile
profile_name = configuration.get_current_profile()
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
# cSpell:ignore ocimds, innodb
# Setup command
cmd = [f'mysqlsh',
(f'{connection_uri}'),
f'--passwords-from-stdin',
f'--save-passwords=never',
# f'--json',
f'--',
f'util',
f'dump-instance',
f'{object_name_prefix}',
f'--osBucketName={bucket.name}',
f'--osNamespace={os_namespace}',
f'--ociConfigFile={config_file_path}',
f'--ociProfile={profile_name}',
'--showProgress=true',
'--consistent=true',
'--ociParManifest=true',
'--ocimds=true']
# ('--compatibility=[force_innodb, strip_definers, '
# 'strip_restricted_grants, strip_tablespaces]')]
# Workaround till shell supports array arguments in command line API mode
cmd = ('mysqlsh '
f'{connection_uri} '
f'--passwords-from-stdin '
f'--save-passwords=never '
f'-e "util.dumpInstance(\'{object_name_prefix}\', '
'{'
f'osBucketName: \'{bucket.name}\', '
f'osNamespace: \'{os_namespace}\', '
f'ociConfigFile: \'{config_file_path}\', '
f'ociProfile: \'{profile_name}\', '
'showProgress: true, '
'consistent: true, '
'ociParManifest: true, '
'ocimds: true, '
'compatibility: [\'force_innodb\', \'strip_definers\', '
'\'strip_restricted_grants\', \'strip_tablespaces\']'
'})"'
)
# Run command
print(f'Starting dump from {connection_uri} to bucket {bucket.name} ...')
try:
with subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1) as proc:
try:
# Give the process some startup time
time.sleep(0.2)
# Provide the password to stdin
proc.stdin.write(connection_password + "\n")
proc.stdin.flush()
# Mirror the output TODO: fix hang on Windows
for line in iter(proc.stdout.readline, ''):
if "Please provide the password for" in line:
line = line.replace(
"Please provide the password for ",
"Performing dump from ")
if line.strip() != "":
print(f'<< {line}', end='')
# for line in iter(proc.stderr.readline, ''):
# print(f'<< ERROR: {line}', end='')
proc.stdout.close()
# proc.stderr.close()
proc.stdin.close()
return_code = proc.wait()
if return_code:
# If a bucket has been created, delete it
if bucket_created:
object_store.delete_bucket(
bucket_name=bucket_name,
compartment_id=compartment_id, config=config,
interactive=False)
print("Dump has failed.")
if return_true_on_success:
return False
else:
print("Dump has finished.")
if return_true_on_success:
return True
except subprocess.TimeoutExpired as e:
proc.kill()
except OSError as e:
if not interactive:
raise e
print(f"Error when starting shell import. {str(e)}")
except ValueError as e:
if not interactive:
raise
print(f"Invalid arguments passed to the shell import. {str(e)}")