in mds_plugin/util.py [0:0]
def dump_to_bucket(**kwargs):
"""Imports a dump on a DB System using a PAR
Args:
**kwargs: Additional options
Keyword Args:
bucket_name (str): The name of the bucket
create_bucket_if_not_exists (bool): Will create the bucket if set
object_name_prefix (str): The prefix used for the object names
connection_uri (str): The URI to the MySQL Server
connection_password (str): The password of the MySQL
user account
compartment_id (str): The OCID of the compartment
config (object): An OCI config object or None.
config_file_path (str): The path to the config file.
interactive (bool): Enables interactive input.
return_true_on_success (bool): Whether to return true on success
Returns:
None or true on success if return_true_on_success is set to true
"""
# Handle kwargs
bucket_name = kwargs.get("bucket_name")
create_bucket_if_not_exists = kwargs.get(
"create_bucket_if_not_exists", False)
object_name_prefix = kwargs.get("object_name_prefix")
connection_uri = kwargs.get("connection_uri")
connection_password = kwargs.get("connection_password")
# If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
# default
config_file_path = kwargs.get(
"config_file_path", getenv("MYSQLSH_OCI_CONFIG_FILE"))
if config_file_path is None:
config_file_path = "~/.oci/config"
compartment_id = kwargs.get("compartment_id")
config = kwargs.get("config")
interactive = kwargs.get("interactive", True)
return_true_on_success = kwargs.get("return_true_on_success")
# Get the active config, compartment and db_system
try:
config = configuration.get_current_config(config=config)
compartment_id = configuration.get_current_compartment_id(
compartment_id=compartment_id, config=config)
bucket_name = configuration.get_current_bucket_name(
bucket_name=bucket_name, config=config)
from mds_plugin import object_store
import os.path
import json
import base64
import subprocess
from pathlib import Path
import time
import mysqlsh
import oci.exceptions
# If no connection_uri was given, try to use the one from the current
# connection if this one uses the classic protocol
if connection_uri is None:
session = mysqlsh.globals.shell.get_session()
if session is not None:
if "mysql://" in session.get_uri():
connection_uri = session.get_uri()
# If no connection_uri was given but interactive is True, ask the user
if connection_uri is None and interactive:
print("Please enter the MySQL Server connection URI for the MySQL "
"Server serving as data source.\nTo get more information about "
"the URI format type '\\? connection' in the MySQL Shell.\n"
"Example: admin@localhost:3306\n")
connection_uri = mysqlsh.globals.shell.prompt(
"MySQL Server connection URI: ")
if connection_uri == "":
print("Operation Cancelled.\n")
return
# Get an admin_password
if connection_password is None:
connection_password = mysqlsh.globals.shell.prompt(
f"Password for {connection_uri}: ",
{'defaultValue': '', 'type': 'password'}).strip()
if connection_password == "":
print("Operation cancelled.")
return
# Get object_name_prefix
if interactive and object_name_prefix is None:
print("\nIf needed, a prefix for the dumped object names can be set.\n"
"Default is to use no prefix.")
object_name_prefix = mysqlsh.globals.shell.prompt(
"\nPlease enter the object name prefix []: ",
{'defaultValue': ''}).strip()
elif object_name_prefix is None:
object_name_prefix = ""
# Get bucket or create one
bucket_created = False
try:
bucket = object_store.get_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, interactive=False)
except oci.exceptions.ServiceError as e:
if create_bucket_if_not_exists:
bucket = object_store.create_bucket(
bucket_name=bucket_name, compartment_id=compartment_id,
config=config, interactive=False, return_object=True)
else:
if not interactive:
raise
print(f"The bucket {bucket_name} does not exist.")
return
os_namespace = object_store.get_object_store_namespace(config)
# Convert Unix path to Windows
config_file_path = os.path.abspath(
os.path.expanduser(config_file_path))
# Get the current profile
profile_name = configuration.get_current_profile()
except oci.exceptions.ServiceError as e:
if not interactive:
raise
print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
return
except (ValueError, oci.exceptions.ClientError) as e:
if not interactive:
raise
print(f'ERROR: {e}')
return
# cSpell:ignore ocimds, innodb
# Setup command
cmd = [f'mysqlsh',
(f'{connection_uri}'),
f'--passwords-from-stdin',
f'--save-passwords=never',
# f'--json',
f'--',
f'util',
f'dump-instance',
f'{object_name_prefix}',
f'--osBucketName={bucket.name}',
f'--osNamespace={os_namespace}',
f'--ociConfigFile={config_file_path}',
f'--ociProfile={profile_name}',
'--showProgress=true',
'--consistent=true',
'--ociParManifest=true',
'--ocimds=true']
# ('--compatibility=[force_innodb, strip_definers, '
# 'strip_restricted_grants, strip_tablespaces]')]
# Workaround till shell supports array arguments in command line API mode
cmd = ('mysqlsh '
f'{connection_uri} '
f'--passwords-from-stdin '
f'--save-passwords=never '
f'-e "util.dumpInstance(\'{object_name_prefix}\', '
'{'
f'osBucketName: \'{bucket.name}\', '
f'osNamespace: \'{os_namespace}\', '
f'ociConfigFile: \'{config_file_path}\', '
f'ociProfile: \'{profile_name}\', '
'showProgress: true, '
'consistent: true, '
'ociParManifest: true, '
'ocimds: true, '
'compatibility: [\'force_innodb\', \'strip_definers\', '
'\'strip_restricted_grants\', \'strip_tablespaces\']'
'})"'
)
# Run command
print(f'Starting dump from {connection_uri} to bucket {bucket.name} ...')
try:
with subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1) as proc:
try:
# Give the process some startup time
time.sleep(0.2)
# Provide the password to stdin
proc.stdin.write(connection_password + "\n")
proc.stdin.flush()
# Mirror the output TODO: fix hang on Windows
for line in iter(proc.stdout.readline, ''):
if "Please provide the password for" in line:
line = line.replace(
"Please provide the password for ",
"Performing dump from ")
if line.strip() != "":
print(f'<< {line}', end='')
# for line in iter(proc.stderr.readline, ''):
# print(f'<< ERROR: {line}', end='')
proc.stdout.close()
# proc.stderr.close()
proc.stdin.close()
return_code = proc.wait()
if return_code:
# If a bucket has been created, delete it
if bucket_created:
object_store.delete_bucket(
bucket_name=bucket_name,
compartment_id=compartment_id, config=config,
interactive=False)
print("Dump has failed.")
if return_true_on_success:
return False
else:
print("Dump has finished.")
if return_true_on_success:
return True
except subprocess.TimeoutExpired as e:
proc.kill()
except OSError as e:
if not interactive:
raise e
print(f"Error when starting shell import. {str(e)}")
except ValueError as e:
if not interactive:
raise
print(f"Invalid arguments passed to the shell import. {str(e)}")