in mds_plugin/util.py [0:0]
def import_from_bucket(**kwargs):
"""Imports a dump from a bucket on a DB System
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket
object_name_prefix (str): The prefix used for the object names
db_system_name (str): The name of the MySQL DB System
db_system_id (str): The OCID of the db_system
db_system_ip (str): The IP to use for the import, overwriting the
one from the db_system if given
db_system_port (str): The Port to use for the import, overwriting
the one from the db_system if given
admin_username (str): The name of the administrator user account
admin_password (str): The password of the administrator account
private_key_file_path (str): The file path to an SSH private key
perform_cleanup (bool): Whether the PARs and bucket should be deleted
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
interactive (bool): Whether user input is considered
Returns:
None
"""
bucket_name = kwargs.get('bucket_name')
db_system_name = kwargs.get('db_system_name')
private_key_file_path = kwargs.get(
"private_key_file_path", "~/.ssh/id_rsa")
object_name_prefix = kwargs.get("object_name_prefix", "")
admin_username = kwargs.get('admin_username')
admin_password = kwargs.get('admin_password')
db_system_id = kwargs.get("db_system_id")
db_system_ip = kwargs.get("db_system_ip")
db_system_port = kwargs.get("db_system_port")
perform_cleanup = kwargs.get("perform_cleanup")
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", True)
# Get the active config, compartment and db_system
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
db_system_id = configuration.get_current_db_system_id(
db_system_id=db_system_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
except ValueError as e:
if not interactive:
raise
print(f"ERROR: {str(e)}")
return
from mds_plugin import compute, user, object_store
import datetime
import time
import base64
import json
import mysqlsh
# If no explicit IP is given, get DB System
if not db_system_ip:
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config,
interactive=interactive)
if db_system is None:
print("No MySQL DB System given. Operation cancelled.\n")
return
# Get the first active endpoint
endpoints = [e for e in db_system.endpoints if e.status == 'ACTIVE']
if len(endpoints) < 1:
print("ERROR: This MySQL DB System has no active endpoints "
"assigned.")
return
endpoint = endpoints[0]
db_system_name = db_system.display_name
db_system_ip = endpoint.ip_address
db_system_port = endpoint.port
if not db_system_port:
db_system_port = '3306'
if not db_system_name:
db_system_name = 'DbSystem'
# Get ObjectStorage namespace
os_namespace = object_store.get_object_store_namespace(config=config)
# Get bucket
bucket = object_store.get_bucket(
bucket_name=bucket_name,
compartment_id=compartment_id,
config=config,
ignore_current=False,
interactive=interactive)
if bucket is None:
print("No bucket given. Operation cancelled.\n")
return
# Check if manifest is available on this bucket
obj = object_store.get_bucket_object(
name=object_name_prefix + '@.manifest.json',
bucket_name=bucket.name,
compartment_id=None,
config=None,
no_error_on_not_found=False)
if obj is None:
print(f"Manifest file '{object_name_prefix}@.manifest.json' not "
f"found on bucket '{bucket.name}'. Operation cancelled.\n")
return
# Get an admin_username
if interactive and not admin_username:
admin_username = mysqlsh.globals.shell.prompt(
"MySQL Administrator account name [admin]: ",
{'defaultValue': 'admin'}).strip()
if not admin_username:
print("Operation cancelled.")
return
# Get an admin_password
if interactive and admin_password is None:
admin_password = mysqlsh.globals.shell.prompt(
"MySQL Administrator account password: ",
{'defaultValue': '', 'type': 'password'}).strip()
if not admin_password:
print("Operation cancelled.")
return
# Create PAR for manifest and progress files
par, progress_par = create_bucket_import_pars(
object_name_prefix=object_name_prefix,
bucket_name=bucket.name,
db_system_name=db_system_name,
compartment_id=compartment_id,
config=config)
if par is None or progress_par is None:
print("Could not create pre-authenticated requests. "
"Operation cancelled.")
return
# Build URLs
par_url_prefix = object_store.get_par_url_prefix(config=config)
par_url = par_url_prefix + par.access_uri
progress_par_url = par_url_prefix + progress_par.access_uri
try:
# Get the 'MySQLDBBastionHost' compute instance (let the user create it
# if it does not exist yet)
mds_proxy = create_compute_instance_for_endpoint(
private_key_file_path=private_key_file_path,
compartment_id=compartment_id, config=config,
interactive=False)
if mds_proxy is None:
print("Could not get a compute instance. Operation cancelled.")
return
# Get the public IP of the instance
public_ip = compute.get_instance_public_ip(
instance_id=mds_proxy.id, compartment_id=compartment_id,
config=config, private_ip_fallback=True)
if public_ip is None or public_ip == "":
print("The public IP of the MySQLDBBastionHost instance could not "
"be fetched.")
return
# Open an SSH connection to the instance
print(f"Connecting to MySQLDBBastionHost instance at {public_ip}...")
try:
success = None
with compute.SshConnection(
username="opc", host=public_ip,
private_key_file_path=private_key_file_path) as conn:
print(f"Connected to MySQLDBBastionHost instance at "
f"{public_ip}.\n"
f"Starting import...")
# Build command string to execute
cmd = ['mysqlsh',
(f'{admin_username}@'
f'{endpoint.ip_address}:{endpoint.port}'),
'--passwords-from-stdin',
'--save-passwords=never',
'--',
'util',
'load-dump',
f'{par_url}',
f'--osBucketName={bucket.name}',
f'--osNamespace={os_namespace}',
f'--progress-file={progress_par_url}',
'--loadUsers=true',
'--showProgress=true',
'--ignoreVersion=true']
cmd = " ".join(cmd)
# Open channel
chan = conn.client.get_transport().open_session()
try:
chan.settimeout(timeout=None)
chan.set_combine_stderr(combine=True)
# Execute shell and call import function
chan.exec_command(cmd)
# Send password to stdin
chan.sendall(f"{admin_password}\n".encode('utf-8'))
chan.shutdown_write()
def print_lines(buffer, no_line_break=False):
for line in buffer:
if "Please provide the password for" \
in line:
line = line.replace(
"Please provide the password for",
"Importing dump on")
if 'Worker' in line:
line += ''
if line.strip() != "":
if no_line_break:
print(line, end='\r')
else:
print(f'>> {line}')
# While the command didn't return an exit code yet
read_buffer_size = 1024
buffer = ""
while not chan.exit_status_ready():
# Update every 0.1 seconds
time.sleep(0.1)
if chan.recv_ready():
buffer += chan.recv(
read_buffer_size).decode('utf-8')
# Check if a line break was in the buffer
if "\n" in buffer:
end = buffer.rfind("\n")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(ready.split("\n"))
elif "\r" in buffer:
end = buffer.rfind("\r")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(
ready.split("\r"), no_line_break=True)
# Ensure we gobble up all remaining data
while True:
try:
output = chan.recv(
read_buffer_size).decode('utf-8')
if not output and not chan.recv_ready():
break
else:
buffer += output
# Check if a line break was in the buffer
if "\n" in buffer:
end = buffer.rfind("\n")+1
ready = buffer[:end]
buffer = buffer[end:]
# Print the lines that are ready
print_lines(ready.split("\n"))
except Exception:
continue
# If there is still something in the buffer, print it
if buffer:
print(buffer)
success = chan.recv_exit_status() == 0
finally:
chan.close()
if success is not None:
print(f"Connection to MySQLDBBastionHost instance closed.\n"
f"Import completed "
f"{'successfully' if success else 'with errors'}.")
except Exception as e:
print("Could not execute the import on compute instance "
f"'MySQLDBBastionHost' at {public_ip}.\nERROR: {str(e)}")
finally:
# Delete the PARs created especially for this import
object_store.delete_par(
bucket_name=bucket.name,
par_id=par.id,
compartment_id=compartment_id,
config=config,
interactive=False)
object_store.delete_par(
bucket_name=bucket.name,
par_id=progress_par.id,
compartment_id=compartment_id,
config=config,
interactive=False)
if perform_cleanup:
print("Performing cleanup...")
if object_name_prefix:
# Delete all PARs created by dump
object_store.delete_par(
bucket_name=bucket.name,
name=f'shell-dump-{object_name_prefix}*',
compartment_id=compartment_id,
config=config,
interactive=False)
# Delete all bucket objects
object_store.delete_bucket_object(
name=f"{object_name_prefix}*",
bucket_name=bucket.name,
compartment_id=compartment_id,
config=config,
interactive=False)
else:
# If no object_name_prefix was used, also delete the bucket
object_store.delete_bucket(
bucket_name=bucket.name,
compartment_id=compartment_id,
config=config,
interactive=False)
print("Operation completed.")