def create_config()

in mds_plugin/configuration.py [0:0]


def create_config(**kwargs):
    """Creates and stores a new OCI config

    Args:
        **kwargs: Optional parameters

    Keyword Args:
        profile_name (str): The name of the OCI profile
        config_file_path (str): The file path of the OCI config file
        tenancy (str): OCID of the tenancy
        region (str): The name of the region
        user_id (str): OCID of the user
        key_file (str): The filename of the OCI API key
        fingerprint (str): The fingerprint of the OCI API key
        interactive (bool): Whether user interaction should be performed

    Returns:
        The new config or None

    If profile_name is omitted, the DEFAULT profile is used.

    If the config_file_path is omitted, ~/.oci/config is used.
    """
    import os.path
    import pathlib
    import configparser
    import oci.config
    import mysqlsh
    from mds_plugin import user

    profile_name = kwargs.get("profile_name")

    # If no config_file_path is given, first check the MYSQLSH_OCI_CONFIG_FILE env_var and only then fall back to
    # default
    config_file_path = kwargs.get(
        "config_file_path", get_default_config_file_path())

    tenancy = kwargs.get("tenancy")
    region = kwargs.get("region")
    user_id = kwargs.get("user_id")
    key_file = kwargs.get("key_file")
    fingerprint = kwargs.get("fingerprint")
    interactive = kwargs.get("interactive", True)

    # Get full path
    config_file_path = os.path.expanduser(config_file_path)
    if key_file:
        key_file = os.path.expanduser(key_file)

    parser = configparser.ConfigParser(interpolation=None)
    if os.path.isfile(config_file_path):
        parser.read(config_file_path)

    # If no profile name was given, query the user for one
    if profile_name is None and interactive:
        profile_name = mysqlsh.globals.shell.prompt(
            "Please enter a name for the new OCI profile: ").strip()
    if not profile_name:
        print("Operation cancelled.")
        return None

    # Ensure profile_name only uses alphanumeric chars
    profile_name = ''.join(e for e in profile_name if e.isalnum())

    if profile_name in parser and profile_name.upper() != 'DEFAULT':
        print(f"The profile '{profile_name}' already exists.")
        return

    # cSpell:ignore OCIDs, sdkconfig
    print(f"Creating a new OCI API configuration profile '{profile_name}'..."
          "\n\nThe creation of a new OCI API configuration profile requires\n"
          "the following steps.\n"
          "1. Identify the geographical OCI region to use. Each region has\n"
          "   its own end point and therefore it is important to chose the\n"
          "   right one for the OCI API \n"
          "2. The Oracle Cloud Identifier (OCID) of the OCI tenancy and\n"
          "   OCI user account need to be looked up on the OCI web console.\n"
          "3. An API Signing Key needs to be generated or an existing key file"
          "\n   needs to be reused.\n"
          "4. The API Signing Key needs to be uploaded to the OCI web console."
          "\n\nPlease follow the instructions outlined by this wizard or read\n"
          "about the details of how to get the recquired OCIDs here:\n"
          "-  https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/"
          "sdkconfig.htm\n")

    # Wait for enter
    mysqlsh.globals.shell.prompt(
        "Please hit the enter key to continue...\n")

    if region is None or region == '':
        print("OCI Region\n"
              "==========\n"
              "Please select a region from this list or provide the exact name "
              "of a new region.")
        region_list = []
        for r in OCI_REGION_LIST:
            region_list.append(f"{r.get('id'):20} {r.get('location')}")
        region = core.prompt_for_list_item(
            item_list=region_list,
            prompt_caption="Please enter the index or name of a region: ",
            prompt_default_value='',
            print_list=True)
        if not region:
            print("Operation cancelled.")
            return None
        # Remove the trailing caption
        region = region.split(' ')[0]

    # If no tenancy or user was given, query the user for one
    if not tenancy or not user_id:
        print("\nOCID of the OCI Tenancy and User Account\n"
              "========================================\n"
              "Please open the OCI web console by following the link below.\n"
              "Enter the name of the tenancy and then log in using your\n"
              "OCI user name and web console password or identity provider.\n"
              f"-  https://console.{region}.oraclecloud.com/\n\n"
              "In the OCI web console please click on the user profile icon\n"
              "top right and choose 'Tenancy'. On the Tenancy page find the\n"
              "<Copy> link next to the Tenancy OCID value and use it to copy\n"
              "the value to the clipboard.\n\n"
              "For the User OCID value click the user profile icon top right\n"
              "and choose 'User Settings'. Again, locate the User OCID value\n"
              "click the <Copy> link to copy the value to the clipboard.\n")
        if not tenancy:
            tenancy = mysqlsh.globals.shell.prompt(
                "Please enter the OCID of the tenancy: ").strip()
        if not tenancy:
            return None
        # Check if the user actually entered the tenancy OCID
        if not tenancy.startswith('ocid1.tenancy.oc1..'):
            # If not, give him another chance to enter the right one
            print("ERROR: The OCID of the tenancy needs to start with "
                  "'ocid1.tenancy.oc1..'.")
            tenancy = mysqlsh.globals.shell.prompt(
                "Please enter the OCID of the tenancy: ").strip()
            if not tenancy:
                return None
        if not user_id:
            user_id = mysqlsh.globals.shell.prompt(
                "Please enter the OCID of the user account: ").strip()
        if not user_id:
            return None
        # Check if the user actually entered the user OCID
        if not user_id.startswith('ocid1.user.oc1..'):
            # If not, give him another chance to enter the right one
            print("ERROR: The OCID of the user needs to start with "
                  "'ocid1.user.oc1..'.")
            user_id = mysqlsh.globals.shell.prompt(
                "Please enter the OCID of the user account: ").strip()
            if not user_id:
                return None

    def get_public_key_file(key_file):
        """Get the public key based on the private key file path

        Try to locate public_key_file by appending _public.pem to the
        path + basename of the private key file

        Args:
            key_file (str): The private key file path

        Returns:
            The public key file path or None
        """
        public_key_file = None
        if '.' in key_file:
            public_key_file = os.path.splitext(key_file)[0] + "_public.pem"
        if not public_key_file or not os.path.isfile(public_key_file):
            public_key_file = mysqlsh.globals.shell.prompt(
                "Please enter the full path to the private key file "
                "[~/.oci/oci_api_key.pem]: ").strip()
            if not os.path.isfile(public_key_file):
                print(f"Key file '{public_key_file}' not found.\n"
                      "Operation cancelled.")
                return None
        return public_key_file

    # If no key_file was given, let the user choose
    if interactive and (key_file is None or not os.path.isfile(key_file)):
        print("\nAPI Signing Key\n"
              "===============\n"
              "In order to access the OCI APIs an API Signing Key is required.\n"
              "The API Signing Key is a special RSA key in PEM format.\n\n"
              "Please select one of the following options.")
        key_option = core.prompt_for_list_item(
            item_list=[
                "Create a new API Signing Key",
                "Reuse an existing API Signing Key"],
            prompt_caption="Please enter the index of the key option to use: ",
            prompt_default_value='',
            print_list=True)
        if key_option == '':
            print("Operation cancelled.")
            return None

        if key_option.startswith('Reuse'):
            key_file = mysqlsh.globals.shell.prompt(
                "Please enter the full path to the private key file "
                "[~/.oci/oci_api_key.pem]: ",
                options={
                    'defaultValue': '~/.oci/oci_api_key.pem'}).strip()
            if not key_file:
                print("Operation cancelled.")
                return None

            key_file = os.path.expanduser(key_file)
            if not os.path.isfile(key_file):
                print(f"Key file '{key_file}' not found. Operation cancelled.")
                return None

            # Get public key based on key_file
            public_key_file = get_public_key_file(key_file)
            if not public_key_file:
                return None
        else:
            print("Generating API key files...")
            key_info = user.create_user_api_key(
                key_path="~/.oci/",
                key_file_prefix=f"oci_api_key_{profile_name}",
                write_to_disk=True,
                return_object=True,
                interactive=False)

            fingerprint = key_info["fingerprint"]
            key_file = os.path.expanduser(
                f"~/.oci/oci_api_key_{profile_name}.pem")
            public_key_file = os.path.splitext(key_file)[0] + "_public.pem"

    if interactive:
        print("\nUploading the API Signing Key to the OCI web console\n"
              "====================================================\n"
              "In the OCI web console please click on the user profile "
              "icon\ntop right and choose 'User Settings'.\n\n"
              "On the lower left side in the Resources list select "
              "'API Keys'.\n"
              "Click on [Add Public Key] and choose the public key file \n"
              f"\n  {public_key_file}\n\n"
              "and click [Add] to complete the upload.\n")

        # Wait for enter
        mysqlsh.globals.shell.prompt(
            "Please hit the enter key once the public key file has been "
            "uploaded...\n")

    if not key_file or not os.path.isfile(key_file):
        print("No API Signing Key file given or found. Operation cancelled.")
        return None

    if not fingerprint:
        # Get public key based on key_file
        public_key_file = get_public_key_file(key_file)
        if not public_key_file:
            return None

        # Read the public key
        with open(public_key_file, mode='rb') as binary_file:
            public_key = binary_file.read()

        # Generate fingerprint
        fingerprint = user.get_fingerprint(public_key)

    # Setup Config
    config = {
        "user": user_id,
        "fingerprint": fingerprint,
        "key_file": key_file,
        "tenancy": tenancy,
        "region": region
    }

    try:
        oci.config.validate_config(config)
    except oci.exceptions.InvalidConfig as e:
        print(f"ERROR: The config is invalid: {str(e)}")
        return

    # Update config parser
    parser[profile_name] = config

    print(f"Storing profile '{profile_name}' in config file ...")

    # Ensure path exists
    pathlib.Path(os.path.dirname(config_file_path)).mkdir(
        parents=True, exist_ok=True)

    # Write the change to disk
    with open(config_file_path, 'w') as configfile:
        parser.write(configfile)

    # Add profile name and default values
    config["profile"] = profile_name
    config = {**oci.config.DEFAULT_CONFIG, **config}

    # Set it as global config object
    setattr(mysqlsh.globals, 'mds_config', config)

    # Set the new profile as the default profile
    load_profile_as_current(profile_name)

    if not interactive:
        return config