def dump_to_bucket()

in mds_plugin/util.py [0:0]


def dump_to_bucket(**kwargs):
    """Imports a dump on a DB System using a PAR

    Args:
        **kwargs: Additional options

    Keyword Args:
        bucket_name (str): The name of the bucket
        create_bucket_if_not_exists (bool): Will create the bucket if set
        object_name_prefix (str): The prefix used for the object names
        connection_uri (str): The URI to the MySQL Server
        connection_password (str): The password of the MySQL
            user account
        compartment_id (str): The OCID of the compartment
        config (object): An OCI config object or None.
        config_file_path (str): The path to the config file.
        interactive (bool): Enables interactive input.
        return_true_on_success (bool): Whether to return true on success

    Returns:
       None or true on success if return_true_on_success is set to true
    """
    # Handle kwargs
    bucket_name = kwargs.get("bucket_name")
    create_bucket_if_not_exists = kwargs.get(
        "create_bucket_if_not_exists", False)
    object_name_prefix = kwargs.get("object_name_prefix")
    connection_uri = kwargs.get("connection_uri")
    connection_password = kwargs.get("connection_password")
    # If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
    # default
    config_file_path = kwargs.get(
        "config_file_path", getenv("MYSQLSH_OCI_CONFIG_FILE"))
    if config_file_path is None:
        config_file_path = "~/.oci/config"
    compartment_id = kwargs.get("compartment_id")
    config = kwargs.get("config")
    interactive = kwargs.get("interactive", True)
    return_true_on_success = kwargs.get("return_true_on_success")

    # Get the active config, compartment and db_system
    try:
        config = configuration.get_current_config(config=config)
        compartment_id = configuration.get_current_compartment_id(
            compartment_id=compartment_id, config=config)
        bucket_name = configuration.get_current_bucket_name(
            bucket_name=bucket_name, config=config)

        from mds_plugin import object_store
        import os.path
        import json
        import base64
        import subprocess
        from pathlib import Path
        import time
        import mysqlsh
        import oci.exceptions

        # If no connection_uri was given, try to use the one from the current
        # connection if this one uses the classic protocol
        if connection_uri is None:
            session = mysqlsh.globals.shell.get_session()
            if session is not None:
                if "mysql://" in session.get_uri():
                    connection_uri = session.get_uri()

        # If no connection_uri was given but interactive is True, ask the user
        if connection_uri is None and interactive:
            print("Please enter the MySQL Server connection URI for the MySQL "
                  "Server serving as data source.\nTo get more information about "
                  "the URI format type '\\? connection' in the MySQL Shell.\n"
                  "Example: admin@localhost:3306\n")
            connection_uri = mysqlsh.globals.shell.prompt(
                "MySQL Server connection URI: ")
            if connection_uri == "":
                print("Operation Cancelled.\n")
                return

        # Get an admin_password
        if connection_password is None:
            connection_password = mysqlsh.globals.shell.prompt(
                f"Password for {connection_uri}: ",
                {'defaultValue': '', 'type': 'password'}).strip()
            if connection_password == "":
                print("Operation cancelled.")
                return

        # Get object_name_prefix
        if interactive and object_name_prefix is None:
            print("\nIf needed, a prefix for the dumped object names can be set.\n"
                  "Default is to use no prefix.")
            object_name_prefix = mysqlsh.globals.shell.prompt(
                "\nPlease enter the object name prefix []: ",
                {'defaultValue': ''}).strip()
        elif object_name_prefix is None:
            object_name_prefix = ""

        # Get bucket or create one
        bucket_created = False
        try:
            bucket = object_store.get_bucket(
                bucket_name=bucket_name, compartment_id=compartment_id,
                config=config, interactive=False)
        except oci.exceptions.ServiceError as e:
            if create_bucket_if_not_exists:
                bucket = object_store.create_bucket(
                    bucket_name=bucket_name, compartment_id=compartment_id,
                    config=config, interactive=False, return_object=True)
            else:
                if not interactive:
                    raise
                print(f"The bucket {bucket_name} does not exist.")
                return

        os_namespace = object_store.get_object_store_namespace(config)

        # Convert Unix path to Windows
        config_file_path = os.path.abspath(
            os.path.expanduser(config_file_path))

        # Get the current profile
        profile_name = configuration.get_current_profile()

    except oci.exceptions.ServiceError as e:
        if not interactive:
            raise
        print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
        return
    except (ValueError, oci.exceptions.ClientError) as e:
        if not interactive:
            raise
        print(f'ERROR: {e}')
        return

    # cSpell:ignore ocimds, innodb
    # Setup command
    cmd = [f'mysqlsh',
           (f'{connection_uri}'),
           f'--passwords-from-stdin',
           f'--save-passwords=never',
           # f'--json',
           f'--',
           f'util',
           f'dump-instance',
           f'{object_name_prefix}',
           f'--osBucketName={bucket.name}',
           f'--osNamespace={os_namespace}',
           f'--ociConfigFile={config_file_path}',
           f'--ociProfile={profile_name}',
           '--showProgress=true',
           '--consistent=true',
           '--ociParManifest=true',
           '--ocimds=true']
    #    ('--compatibility=[force_innodb, strip_definers, '
    #    'strip_restricted_grants, strip_tablespaces]')]

    # Workaround till shell supports array arguments in command line API mode
    cmd = ('mysqlsh '
           f'{connection_uri} '
           f'--passwords-from-stdin '
           f'--save-passwords=never '
           f'-e "util.dumpInstance(\'{object_name_prefix}\', '
           '{'
           f'osBucketName: \'{bucket.name}\', '
           f'osNamespace: \'{os_namespace}\', '
           f'ociConfigFile: \'{config_file_path}\', '
           f'ociProfile: \'{profile_name}\', '
           'showProgress: true, '
           'consistent: true, '
           'ociParManifest: true, '
           'ocimds: true, '
           'compatibility: [\'force_innodb\', \'strip_definers\', '
           '\'strip_restricted_grants\', \'strip_tablespaces\']'
           '})"'
           )

    # Run command
    print(f'Starting dump from {connection_uri} to bucket {bucket.name} ...')
    try:
        with subprocess.Popen(
                cmd, shell=True,
                stdout=subprocess.PIPE,
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT,
                universal_newlines=True, bufsize=1) as proc:
            try:
                # Give the process some startup time
                time.sleep(0.2)

                # Provide the password to stdin
                proc.stdin.write(connection_password + "\n")
                proc.stdin.flush()

                # Mirror the output TODO: fix hang on Windows
                for line in iter(proc.stdout.readline, ''):
                    if "Please provide the password for" in line:
                        line = line.replace(
                            "Please provide the password for ",
                            "Performing dump from ")

                    if line.strip() != "":
                        print(f'<< {line}', end='')

                # for line in iter(proc.stderr.readline, ''):
                #     print(f'<< ERROR: {line}', end='')

                proc.stdout.close()
                # proc.stderr.close()
                proc.stdin.close()
                return_code = proc.wait()
                if return_code:
                    # If a bucket has been created, delete it
                    if bucket_created:
                        object_store.delete_bucket(
                            bucket_name=bucket_name,
                            compartment_id=compartment_id, config=config,
                            interactive=False)
                    print("Dump has failed.")
                    if return_true_on_success:
                        return False
                else:
                    print("Dump has finished.")
                    if return_true_on_success:
                        return True
            except subprocess.TimeoutExpired as e:
                proc.kill()

    except OSError as e:
        if not interactive:
            raise e
        print(f"Error when starting shell import. {str(e)}")
    except ValueError as e:
        if not interactive:
            raise
        print(f"Invalid arguments passed to the shell import. {str(e)}")