def create_db_system()

in mds_plugin/mysql_database_service.py [0:0]


def create_db_system(**kwargs):
    """Creates a DbSystem with the given id

    If no id is given, it will prompt the user for the id.

    Args:
        **kwargs: Optional parameters

    Keyword Args:
        db_system_name (str): The new name of the DB System.
        description (str): The new description of the DB System.
        availability_domain (str): The name of the availability_domain
        shape (str): The compute shape name to use for the instance
        subnet_id (str): The OCID of the subnet to use
        configuration_id (str): The OCID of the MySQL configuration
        data_storage_size_in_gbs (int): The data storage size in gigabytes
        mysql_version (str): The MySQL version
        admin_username (str): The name of the administrator user account
        admin_password (str): The password of the administrator account
        private_key_file_path (str): The file path to an SSH private key
        par_url (str): The PAR url used for initial data import
        perform_cleanup_after_import (bool): Whether the bucket and PARs should
            be kept or deleted if an import took place
        source_mysql_uri (str): The MySQL Connection URI if data should
            be imported from an existing MySQL Server instance
        source_mysql_password (str): The password to use when data
            should be imported from an existing MySQL Server instance
        source_local_dump_dir (str): The path to a local directory that
            contains a dump
        source_bucket (str): The name of the source bucket that contains
            a dump
        host_image_id (str): OCID of the host image to use for this Instance.
            Private API only.
        defined_tags (dict): The defined_tags of the dynamic group.
        freeform_tags (dict): The freeform_tags of the dynamic group
        compartment_id (str): The OCID of the compartment
        config (object): An OCI config object or None.
        interactive (bool): Ask the user for input if needed
        return_object (bool): Whether to return the object when created

    Returns:
       None or the new DB System object if return_object is set to true
    """
    db_system_name = kwargs.get("db_system_name")
    description = kwargs.get("description")
    availability_domain = kwargs.get("availability_domain")
    shape = kwargs.get("shape")
    subnet_id = kwargs.get("subnet_id")
    configuration_id = kwargs.get("configuration_id")
    data_storage_size_in_gbs = kwargs.get("data_storage_size_in_gbs")
    mysql_version = kwargs.get("mysql_version")
    admin_username = kwargs.get("admin_username")
    admin_password = kwargs.get("admin_password")
    private_key_file_path = kwargs.get(
        "private_key_file_path", "~/.ssh/id_rsa")
    par_url = kwargs.get("par_url")
    perform_cleanup_after_import = kwargs.get(
        "perform_cleanup_after_import")
    source_mysql_uri = kwargs.get("source_mysql_uri")
    source_mysql_password = kwargs.get("source_mysql_password")
    source_local_dump_dir = kwargs.get("source_local_dump_dir")
    source_bucket = kwargs.get("source_bucket")
    #host_image_id = kwargs.get("host_image_id")
    defined_tags = kwargs.get("defined_tags")
    # Conversion from Shell Dict type
    if defined_tags:
        defined_tags = dict(defined_tags)
    freeform_tags = kwargs.get("freeform_tags")
    # Conversion from Shell Dict type
    if freeform_tags:
        freeform_tags = dict(freeform_tags)
    compartment_id = kwargs.get("compartment_id")
    config = kwargs.get("config")
    interactive = kwargs.get("interactive", True)
    return_object = kwargs.get("return_object", False)

    try:
        # Get the active config and compartment
        config = configuration.get_current_config(config=config)
        compartment_id = configuration.get_current_compartment_id(
            compartment_id=compartment_id, config=config)

        import oci.mysql
        from pathlib import Path
        import mysqlsh
        from mds_plugin import compartment, compute, network, object_store
        import datetime
        import time

        # Set the import_source_type to 0 to default to a clean new DB System
        import_source_type = 0

        # Check if source_* parameters are given and if so, set the correct
        # import_source_type
        if source_mysql_uri is not None:
            # Import from an existing MySQL Server instance
            import_source_type = 1
        elif source_local_dump_dir is not None:
            # Import from a local data dir
            import_source_type = 2
        elif source_bucket is not None:
            # Import from an existing bucket
            import_source_type = 3

        # If the user did not specify a par_url, or other source paremeter,
        # let him choose if he wants to import data from a given source

        if interactive and import_source_type == 0 and par_url is None:
            print("Choose one of the following options of how to create the "
                  "MySQL DB System:\n")

            import_sources = [
                "Create a clean MySQL DB System",
                ("Create a MySQL DB System from an existing MySQL Server "
                 "instance"),
                "Create a MySQL DB System from a local dump",
                ("Create a MySQL DB System from a dump stored on OCI "
                 "Object Storage")
            ]
            import_source = core.prompt_for_list_item(
                item_list=import_sources,
                prompt_caption=("Please enter the index of an option listed "
                                "above: "),
                prompt_default_value='', print_list=True)
            if import_source == "":
                print("Operation cancelled.")
                return
            import_source_type = import_sources.index(import_source)

        # Get a name
        if not db_system_name and interactive:
            db_system_name = core.prompt(
                "Please enter the name for the new DB System: ").strip()
        if not db_system_name:
            raise Exception("No name given. "
                            "Operation cancelled.")

        # Get a description
        if not description and interactive:
            description = core.prompt(
                "Please enter a description for the new DB System: ").strip()

        # Get an admin_username
        if not admin_username and interactive:
            admin_username = core.prompt(
                "MySQL Administrator account name [admin]: ",
                {'defaultValue': 'admin'}).strip()
        if not admin_username:
            raise Exception("No admin username given. "
                            "Operation cancelled.")

        # Get an admin_password
        if not admin_password and interactive:
            admin_password = get_validated_mysql_password(
                password_caption="MySQL Administrator account")
        if not admin_password:
            raise Exception("No admin password given. "
                            "Operation cancelled.")

        # Get data_storage_size_in_gbs
        if not data_storage_size_in_gbs and interactive:
            data_storage_size_in_gbs = core.prompt(
                "Please enter the amount of data storage size in gigabytes "
                "with a minimum of 50 GB [50]: ",
                {'defaultValue': '50'}).strip()
            try:
                data_storage_size_in_gbs = int(data_storage_size_in_gbs)
            except ValueError:
                ValueError("Please enter a number for data storage size.\n")

        if not data_storage_size_in_gbs:
            raise Exception("No data storage size given. "
                            "Operation cancelled.")

        # Get the availability_domain name
        availability_domain_obj = compartment.get_availability_domain(
            random_selection=not interactive,
            compartment_id=compartment_id,
            availability_domain=availability_domain,
            config=config, interactive=interactive,
            return_python_object=True)
        if not availability_domain_obj:
            raise Exception("No availability domain selected. "
                            "Operation cancelled.")
        availability_domain = availability_domain_obj.name
        if interactive:
            print(f"Using availability domain {availability_domain}.")

        # Get the shapes
        shape_id = compute.get_shape_name(
            shape_name=shape, limit_shapes_to=[
                "VM.Standard.E2.1", "VM.Standard.E2.2",
                "VM.Standard.E2.4", "VM.Standard.E2.8"],
            compartment_id=compartment_id,
            availability_domain=availability_domain, config=config,
            interactive=interactive)
        if shape_id is None or shape_id == "":
            print("Compute Shape not set or found. Operation cancelled.")
            return
        if interactive:
            print(f"Using shape {shape_id}.")

        # Get private subnet
        subnet = network.get_subnet(
            subnet_id=subnet_id, public_subnet=False,
            compartment_id=compartment_id, config=config,
            interactive=interactive, availability_domain=availability_domain)
        if subnet is None:
            print("Operation cancelled.")
            return
        if interactive:
            print(f"Using subnet {subnet.display_name}.")

        # Get mysql_version
        mysql_version = get_mysql_version(compartment_id=compartment_id,
                                          config=config)
        if mysql_version is None:
            print("Operation cancelled.")
            return
        print(f"Using MySQL version {mysql_version}.")

        # Get mysql_configuration
        mysql_configuration = get_db_system_configuration(
            configuration_id=configuration_id, shape=shape_id,
            availability_domain=availability_domain,
            compartment_id=compartment_id, config=config, return_python_object=True)
        if mysql_configuration is None:
            print("Operation cancelled.")
            return
        print(f"Using MySQL configuration {mysql_configuration.display_name}.")

        # TODO Check Limits
        # limits.list_limit_values(config["tenancy"], "mysql").data
        # limits.get_resource_availability(
        #     service_name="mysql", limit_name="vm-standard-e2-4-count",
        #     compartment_id=config["tenancy"],
        #     availability_domain="fblN:US-ASHBURN-AD-1").data
        # limits.get_resource_availability(
        #     service_name="compute", limit_name="standard-e2-core-ad-count",
        #     compartment_id=config["tenancy"],
        #     availability_domain="fblN:US-ASHBURN-AD-1").data

        # If requested, prepare import
        if import_source_type > 0:
            # If a bucket needs to be created, define a name for it
            if import_source_type == 1 or import_source_type == 2:
                # Take all alphanumeric chars from the DB System name
                # to create the bucket_name
                bucket_name = (
                    f"{''.join(e for e in db_system_name if e.isalnum())}_import_"
                    f"{datetime.datetime.now():%Y%m%d%H%M%S}")

                print(f"\nCreating bucket {bucket_name}...")

                bucket = object_store.create_bucket(
                    bucket_name=bucket_name, compartment_id=compartment_id,
                    config=config, return_object=True)
                if bucket is None:
                    print("Cancelling operation")
                    return

                if perform_cleanup_after_import is None:
                    perform_cleanup_after_import = True

            # Create a MySQL DB System from an existing MySQL Server instance
            if import_source_type == 1:
                # Start the dump process
                if not util.dump_to_bucket(bucket_name=bucket.name,
                                           connection_uri=source_mysql_uri,
                                           connection_password=source_mysql_password,
                                           create_bucket_if_not_exists=True,
                                           object_name_prefix="",
                                           interactive=interactive,
                                           return_true_on_success=True):
                    print(f"Could not dump the given instance to the object "
                          f"store bucket {bucket.name}")
                    return
            # Create a MySQL DB System from local dir
            elif import_source_type == 2:
                if interactive and source_local_dump_dir is None:
                    source_local_dump_dir = mysqlsh.globals.shell.prompt(
                        "Please specify the directory path that contains the "
                        "dump: ",
                        {'defaultValue': ''}).strip()
                    if source_local_dump_dir == "":
                        print("Operation cancelled.")
                        return
                elif source_local_dump_dir is None:
                    print("No directory path given. Operation cancelled.")
                    return

                # Upload the files from the given directory to the bucket
                file_count = object_store.create_bucket_objects_from_local_dir(
                    local_dir_path=source_local_dump_dir,
                    bucket_name=bucket.name,
                    object_name_prefix="",
                    compartment_id=compartment_id, config=config,
                    interactive=False)
                if file_count is None:
                    print("Cancelling operation")
                    return
            elif import_source_type == 3:
                # Create a MySQL DB System from a bucket
                bucket = object_store.get_bucket(
                    bucket_name=source_bucket,
                    compartment_id=compartment_id,
                    config=config)
                if bucket is None:
                    print("Cancelling operation")
                    return
                bucket_name = bucket.name

                if perform_cleanup_after_import is None:
                    perform_cleanup_after_import = False

            # Create PAR for import manifest and progress files
            par, progress_par = util.create_bucket_import_pars(
                object_name_prefix="",
                bucket_name=bucket.name,
                db_system_name=db_system_name,
                compartment_id=compartment_id,
                config=config)
            if par is None or progress_par is None:
                return

            # Build URLs
            par_url_prefix = object_store.get_par_url_prefix(config=config)
            par_url = par_url_prefix + par.access_uri
            # progress_par_url = par_url_prefix + progress_par.access_uri

        # Once the API supports the new PAR based import, build the
        # import_details using the given par_url
        # if par_url:
        #     import urllib.parse
        #     import_details = oci.mysql.models.\
        #         CreateDbSystemSourceImportFromUrlDetails(
        #             source_type=oci.mysql.models.
        #             CreateDbSystemSourceImportFromUrlDetails.
        #             SOURCE_TYPE_IMPORTURL,
        #             source_url=(f'{par_url}?progressPar='
        #                         f'{urllib.parse.quote(progress_par_url)}'))

        db_system_details = oci.mysql.models.CreateDbSystemDetails(
            description=description,
            admin_username=admin_username,
            admin_password=admin_password,
            compartment_id=compartment_id,
            configuration_id=mysql_configuration.id,
            data_storage_size_in_gbs=data_storage_size_in_gbs,
            display_name=db_system_name,
            mysql_version=mysql_version,
            shape_name=shape_id,
            availability_domain=availability_domain,
            subnet_id=subnet.id,
            defined_tags=defined_tags,
            freeform_tags=freeform_tags
            # host_image_id=host_image_id
            # source=import_details
        )

        # Get DbSystem Client
        db_sys = core.get_oci_db_system_client(config=config)

        # Create DB System
        new_db_system = db_sys.create_db_system(db_system_details).data

        # If there was a PAR URL given, wait till the system becomes
        # ACTIVE and then perform the clean up work
        if par_url is not None:
            print("Waiting for MySQL DB System to become active.\n"
                  "This can take up to 20 minutes or more...", end="")
            # Wait until the lifecycle_state == ACTIVE, 20 minutes max
            cycles = 0
            while cycles < 240:
                db_system = db_sys.get_db_system(new_db_system.id).data
                if db_system.lifecycle_state == "ACTIVE" or \
                        db_system.lifecycle_state == "FAILED":
                    break
                else:
                    time.sleep(10)
                    print(".", end="")
                cycles += 1
            print("")

            # Until the API is ready to directly import at deployment time,
            # also start the import from here

            if db_system.lifecycle_state == "ACTIVE":
                util.import_from_bucket(
                    bucket_name=bucket_name,
                    db_system_id=new_db_system.id,
                    db_system_name=db_system_name,
                    object_name_prefix="",
                    admin_username=admin_username,
                    admin_password=admin_password,
                    private_key_file_path=private_key_file_path,
                    perform_cleanup=perform_cleanup_after_import,
                    compartment_id=compartment_id,
                    config=config,
                    interactive=False
                )
        else:
            if return_object:
                return new_db_system
            else:
                if new_db_system.lifecycle_state == "CREATING":
                    print(f"\nMySQL DB System {db_system_name} is being created.\n"
                          f"Use mds.ls() to check it's provisioning state.\n")
                else:
                    print(f"\nThe creation of the MySQL DB System {db_system_name} "
                          "failed.\n")

    except oci.exceptions.ServiceError as e:
        if not interactive:
            raise
        print(f'ERROR: {e.message}. (Code: {e.code}; Status: {e.status})')
        return
    except (ValueError, oci.exceptions.ClientError) as e:
        if not interactive:
            raise
        print(f'ERROR: {e}')
        return