in mds_plugin/util.py [0:0]
def import_from_local_dir(local_dump_dir=None, db_system_name=None,
options=None):
"""Imports a local dump on a DB System
Args:
local_dump_dir (str): The directory that holds the local dump
db_system_name (str): The new name of the DB System.
options (dict): A dict with various options
object_name_prefix (str): The prefix used for the object names
db_system_id (str): The OCID of the db_system
admin_username (str): The name of the administrator user account
admin_password (str): The password of the administrator user account
private_key_file_path (str): The file path to an SSH private key
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
interactive (bool): Whether user input is considered
Returns:
None
"""
if options is None:
options = {}
object_name_prefix = options.get("object_name_prefix")
db_system_id = options.get("db_system_id")
admin_username = options.get("admin_username")
admin_password = options.get("admin_password")
private_key_file_path = options.get(
"private_key_file_path", "~/.ssh/id_rsa")
compartment_id = options.get("compartment_id")
config = options.get("config")
interactive = options.get("interactive", True)
# Get the active config and compartment
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
import oci.mysql
from pathlib import Path
from os import listdir
import os.path
import datetime
import mysqlsh
import object_store
if interactive:
print("Preparing for data import from a local directory...\n")
if interactive and local_dump_dir is None:
local_dump_dir = mysqlsh.globals.shell.prompt(
"Please specify the directory path that contains the dump: ",
{'defaultValue': ''}).strip()
if local_dump_dir == "":
print("Operation cancelled.")
return
elif local_dump_dir is None:
print("No directory path given. Operation cancelled.")
return
# Get an admin_username
if interactive and admin_username is None:
admin_username = mysqlsh.globals.shell.prompt(
"MySQL Administrator account name [admin]: ",
{'defaultValue': 'admin'}).strip()
if admin_username == "":
print("Operation cancelled.")
return
elif admin_username is None:
admin_username = "admin"
# Get an admin_password
if interactive and admin_password is None:
admin_password = mysqlsh.globals.shell.prompt(
"MySQL Administrator account password: ",
{'defaultValue': '', 'type': 'password'}).strip()
if admin_password == "":
print("Operation cancelled.")
return
elif admin_password is None:
raise ValueError("The argument admin_password must be set.")
# Get DB System
db_system = mysql_database_service.get_db_system(
db_system_name=db_system_name, db_system_id=db_system_id,
compartment_id=compartment_id, config=config)
if db_system is None:
print("Operation cancelled.\n")
return
options["db_system_id"] = db_system.id
# Make sure there is a 'MySQLDBBastionHost' compute instance (let the user
# create it if it does not exist yet)
mds_proxy = create_compute_instance_for_endpoint(
private_key_file_path=private_key_file_path,
compartment_id=db_system.compartment_id, config=config,
interactive=False)
if mds_proxy is None:
print("Operation cancelled.")
return
# Take all alphanumeric chars from the DB System display_name
# to create the bucket_name
bucket_name = (
f"{''.join(e for e in db_system.display_name if e.isalnum())}"
f"_import_{datetime.datetime.now():%Y%m%d%H%M%S}")
print(f"\nCreating bucket {bucket_name}...")
bucket = object_store.create_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, return_object=True)
if bucket is None:
print("Cancelling operation")
return
# Upload the files from the given directory to the bucket
file_count = object_store.create_bucket_objects_from_local_dir(
local_dir_path=local_dump_dir, bucket_name=bucket.name,
object_name_prefix=object_name_prefix,
compartment_id=compartment_id, config=config, interactive=False)
if file_count is None:
print("Cancelling operation")
return
# Start the import from the bucket
import_from_bucket(
bucket_name=bucket.name,
options=options)
# if not object_store.delete_bucket_object(
# bucket_object_name="*", bucket_name=bucket.name,
# compartment_id=compartment_id, config=config,
# interactive=False):
# print("Could not delete files from buckets.")
if not object_store.delete_bucket(
bucket_name=bucket.name,
compartment_id=compartment_id, config=config,
interactive=False):
print("Could not delete the bucket.")
if interactive:
print("Operation completed.")
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return