def import_from_bucket()

in mds_plugin/util.py [0:0]


def import_from_bucket(**kwargs):
    """Imports a dump from a bucket on a DB System

    Args:
        **kwargs: Additional options

    Keyword Args:
        bucket_name (str): The name of the bucket
        object_name_prefix (str): The prefix used for the object names
        db_system_name (str): The name of the MySQL DB System
        db_system_id (str): The OCID of the db_system
        db_system_ip (str): The IP to use for the import, overwriting the
                one from the db_system if given
        db_system_port (str): The Port to use for the import, overwriting
                the one from the db_system if given
        admin_username (str): The name of the administrator user account
        admin_password (str): The password of the administrator account
        private_key_file_path (str): The file path to an SSH private key
        perform_cleanup (bool): Whether the PARs and bucket should be deleted
        compartment_id (str): The OCID of the compartment
        config (object): An OCI config object or None.
        interactive (bool): Whether user input is considered
    Returns:
       None
    """

    bucket_name = kwargs.get('bucket_name')
    db_system_name = kwargs.get('db_system_name')
    private_key_file_path = kwargs.get(
        "private_key_file_path", "~/.ssh/id_rsa")
    object_name_prefix = kwargs.get("object_name_prefix", "")
    admin_username = kwargs.get('admin_username')
    admin_password = kwargs.get('admin_password')
    db_system_id = kwargs.get("db_system_id")
    db_system_ip = kwargs.get("db_system_ip")
    db_system_port = kwargs.get("db_system_port")
    perform_cleanup = kwargs.get("perform_cleanup")
    compartment_id = kwargs.get("compartment_id")
    config = kwargs.get("config")
    interactive = kwargs.get("interactive", True)

    # Get the active config, compartment and db_system
    try:
        config = configuration.get_current_config(config=config)
        compartment_id = configuration.get_current_compartment_id(
            compartment_id=compartment_id, config=config)
        db_system_id = configuration.get_current_db_system_id(
            db_system_id=db_system_id, config=config)
        bucket_name = configuration.get_current_bucket_name(
            bucket_name=bucket_name, config=config)
    except ValueError as e:
        if not interactive:
            raise
        print(f"ERROR: {str(e)}")
        return

    from mds_plugin import compute, user, object_store
    import datetime
    import time
    import base64
    import json
    import mysqlsh

    # If no explicit IP is given, get DB System
    if not db_system_ip:
        db_system = mysql_database_service.get_db_system(
            db_system_name=db_system_name, db_system_id=db_system_id,
            compartment_id=compartment_id, config=config,
            interactive=interactive)
        if db_system is None:
            print("No MySQL DB System given. Operation cancelled.\n")
            return

        # Get the first active endpoint
        endpoints = [e for e in db_system.endpoints if e.status == 'ACTIVE']
        if len(endpoints) < 1:
            print("ERROR: This MySQL DB System has no active endpoints "
                  "assigned.")
            return
        endpoint = endpoints[0]

        db_system_name = db_system.display_name
        db_system_ip = endpoint.ip_address
        db_system_port = endpoint.port

    if not db_system_port:
        db_system_port = '3306'

    if not db_system_name:
        db_system_name = 'DbSystem'

    # Get ObjectStorage namespace
    os_namespace = object_store.get_object_store_namespace(config=config)

    # Get bucket
    bucket = object_store.get_bucket(
        bucket_name=bucket_name,
        compartment_id=compartment_id,
        config=config,
        ignore_current=False,
        interactive=interactive)
    if bucket is None:
        print("No bucket given. Operation cancelled.\n")
        return

    # Check if manifest is available on this bucket
    obj = object_store.get_bucket_object(
        name=object_name_prefix + '@.manifest.json',
        bucket_name=bucket.name,
        compartment_id=None,
        config=None,
        no_error_on_not_found=False)
    if obj is None:
        print(f"Manifest file '{object_name_prefix}@.manifest.json' not "
              f"found on bucket '{bucket.name}'. Operation cancelled.\n")
        return

    # Get an admin_username
    if interactive and not admin_username:
        admin_username = mysqlsh.globals.shell.prompt(
            "MySQL Administrator account name [admin]: ",
            {'defaultValue': 'admin'}).strip()
    if not admin_username:
        print("Operation cancelled.")
        return

    # Get an admin_password
    if interactive and admin_password is None:
        admin_password = mysqlsh.globals.shell.prompt(
            "MySQL Administrator account password: ",
            {'defaultValue': '', 'type': 'password'}).strip()
    if not admin_password:
        print("Operation cancelled.")
        return

    # Create PAR for manifest and progress files
    par, progress_par = create_bucket_import_pars(
        object_name_prefix=object_name_prefix,
        bucket_name=bucket.name,
        db_system_name=db_system_name,
        compartment_id=compartment_id,
        config=config)
    if par is None or progress_par is None:
        print("Could not create pre-authenticated requests. "
              "Operation cancelled.")
        return

    # Build URLs
    par_url_prefix = object_store.get_par_url_prefix(config=config)
    par_url = par_url_prefix + par.access_uri
    progress_par_url = par_url_prefix + progress_par.access_uri

    try:
        # Get the 'MySQLDBBastionHost' compute instance (let the user create it
        # if it does not exist yet)
        mds_proxy = create_compute_instance_for_endpoint(
            private_key_file_path=private_key_file_path,
            compartment_id=compartment_id, config=config,
            interactive=False)
        if mds_proxy is None:
            print("Could not get a compute instance. Operation cancelled.")
            return

        # Get the public IP of the instance
        public_ip = compute.get_instance_public_ip(
            instance_id=mds_proxy.id, compartment_id=compartment_id,
            config=config, private_ip_fallback=True)
        if public_ip is None or public_ip == "":
            print("The public IP of the MySQLDBBastionHost instance could not "
                  "be fetched.")
            return

        # Open an SSH connection to the instance
        print(f"Connecting to MySQLDBBastionHost instance at {public_ip}...")
        try:
            success = None
            with compute.SshConnection(
                    username="opc", host=public_ip,
                    private_key_file_path=private_key_file_path) as conn:

                print(f"Connected to MySQLDBBastionHost instance at "
                      f"{public_ip}.\n"
                      f"Starting import...")

                # Build command string to execute
                cmd = ['mysqlsh',
                       (f'{admin_username}@'
                        f'{endpoint.ip_address}:{endpoint.port}'),
                       '--passwords-from-stdin',
                       '--save-passwords=never',
                       '--',
                       'util',
                       'load-dump',
                       f'{par_url}',
                       f'--osBucketName={bucket.name}',
                       f'--osNamespace={os_namespace}',
                       f'--progress-file={progress_par_url}',
                       '--loadUsers=true',
                       '--showProgress=true',
                       '--ignoreVersion=true']
                cmd = " ".join(cmd)

                # Open channel
                chan = conn.client.get_transport().open_session()
                try:
                    chan.settimeout(timeout=None)
                    chan.set_combine_stderr(combine=True)

                    # Execute shell and call import function
                    chan.exec_command(cmd)

                    # Send password to stdin
                    chan.sendall(f"{admin_password}\n".encode('utf-8'))
                    chan.shutdown_write()

                    def print_lines(buffer, no_line_break=False):
                        for line in buffer:
                            if "Please provide the password for" \
                                    in line:
                                line = line.replace(
                                    "Please provide the password for",
                                    "Importing dump on")
                            if 'Worker' in line:
                                line += ''
                            if line.strip() != "":
                                if no_line_break:
                                    print(line, end='\r')
                                else:
                                    print(f'>> {line}')

                    # While the command didn't return an exit code yet
                    read_buffer_size = 1024
                    buffer = ""
                    while not chan.exit_status_ready():
                        # Update every 0.1 seconds
                        time.sleep(0.1)

                        if chan.recv_ready():
                            buffer += chan.recv(
                                read_buffer_size).decode('utf-8')
                            # Check if a line break was in the buffer
                            if "\n" in buffer:
                                end = buffer.rfind("\n")+1
                                ready = buffer[:end]
                                buffer = buffer[end:]
                                # Print the lines that are ready
                                print_lines(ready.split("\n"))
                            elif "\r" in buffer:
                                end = buffer.rfind("\r")+1
                                ready = buffer[:end]
                                buffer = buffer[end:]
                                # Print the lines that are ready
                                print_lines(
                                    ready.split("\r"), no_line_break=True)

                    # Ensure we gobble up all remaining data
                    while True:
                        try:
                            output = chan.recv(
                                read_buffer_size).decode('utf-8')
                            if not output and not chan.recv_ready():
                                break
                            else:
                                buffer += output
                                # Check if a line break was in the buffer
                                if "\n" in buffer:
                                    end = buffer.rfind("\n")+1
                                    ready = buffer[:end]
                                    buffer = buffer[end:]
                                    # Print the lines that are ready
                                    print_lines(ready.split("\n"))
                        except Exception:
                            continue

                    # If there is still something in the buffer, print it
                    if buffer:
                        print(buffer)

                    success = chan.recv_exit_status() == 0
                finally:
                    chan.close()

            if success is not None:
                print(f"Connection to MySQLDBBastionHost instance closed.\n"
                      f"Import completed "
                      f"{'successfully' if success else 'with errors'}.")

        except Exception as e:
            print("Could not execute the import on compute instance "
                  f"'MySQLDBBastionHost' at {public_ip}.\nERROR: {str(e)}")
    finally:
        # Delete the PARs created especially for this import
        object_store.delete_par(
            bucket_name=bucket.name,
            par_id=par.id,
            compartment_id=compartment_id,
            config=config,
            interactive=False)
        object_store.delete_par(
            bucket_name=bucket.name,
            par_id=progress_par.id,
            compartment_id=compartment_id,
            config=config,
            interactive=False)

        if perform_cleanup:
            print("Performing cleanup...")

            if object_name_prefix:
                # Delete all PARs created by dump
                object_store.delete_par(
                    bucket_name=bucket.name,
                    name=f'shell-dump-{object_name_prefix}*',
                    compartment_id=compartment_id,
                    config=config,
                    interactive=False)

                # Delete all bucket objects
                object_store.delete_bucket_object(
                    name=f"{object_name_prefix}*",
                    bucket_name=bucket.name,
                    compartment_id=compartment_id,
                    config=config,
                    interactive=False)
            else:
                # If no object_name_prefix was used, also delete the bucket
                object_store.delete_bucket(
                    bucket_name=bucket.name,
                    compartment_id=compartment_id,
                    config=config,
                    interactive=False)

    print("Operation completed.")