def config()

in mysql-connector-python/lib/mysql/connector/abstracts.py [0:0]


    def config(self, **kwargs: Any) -> None:
        """Configures the MySQL Connection.

        This method allows you to configure the `MySQLConnection`
        instance after it has been instantiated.

        Args:
            **kwargs: For a complete list of possible arguments, see [1].

        Raises:
            AttributeError: When provided unsupported connection arguments.
            InterfaceError: When the provided connection argument is invalid.

        References:
            [1]: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
        """
        # opentelemetry related
        self._span = kwargs.pop(OPTION_CNX_SPAN, None)
        self._tracer = kwargs.pop(OPTION_CNX_TRACER, None)

        config = kwargs.copy()
        if "dsn" in config:
            raise NotSupportedError("Data source name is not supported")

        # Read option files
        config = read_option_files(**config)

        # Configure how we handle MySQL warnings
        try:
            self.get_warnings = config["get_warnings"]
            del config["get_warnings"]
        except KeyError:
            pass  # Leave what was set or default
        try:
            self.raise_on_warnings = config["raise_on_warnings"]
            del config["raise_on_warnings"]
        except KeyError:
            pass  # Leave what was set or default

        # Configure client flags
        try:
            default = ClientFlag.get_default()
            self.client_flags = config["client_flags"] or default
            del config["client_flags"]
        except KeyError:
            pass  # Missing client_flags-argument is OK

        try:
            if config["compress"]:
                self._compress = True
                self.client_flags = [ClientFlag.COMPRESS]
        except KeyError:
            pass  # Missing compress argument is OK

        self._allow_local_infile = config.get(
            "allow_local_infile", DEFAULT_CONFIGURATION["allow_local_infile"]
        )
        self._allow_local_infile_in_path = config.get(
            "allow_local_infile_in_path",
            DEFAULT_CONFIGURATION["allow_local_infile_in_path"],
        )
        infile_in_path = None
        if self._allow_local_infile_in_path:
            infile_in_path = os.path.abspath(self._allow_local_infile_in_path)
            if (
                infile_in_path
                and os.path.exists(infile_in_path)
                and not os.path.isdir(infile_in_path)
                or os.path.islink(infile_in_path)
            ):
                raise AttributeError("allow_local_infile_in_path must be a directory")
        if self._allow_local_infile or self._allow_local_infile_in_path:
            self.client_flags = [ClientFlag.LOCAL_FILES]
        else:
            self.client_flags = [-ClientFlag.LOCAL_FILES]

        try:
            if not config["consume_results"]:
                self._consume_results = False
            else:
                self._consume_results = True
        except KeyError:
            self._consume_results = False

        # Configure auth_plugin
        try:
            self._auth_plugin = config["auth_plugin"]
            del config["auth_plugin"]
        except KeyError:
            self._auth_plugin = ""

        # Disallow the usage of some default authentication plugins
        if self._auth_plugin == "authentication_webauthn_client":
            raise InterfaceError(
                f"'{self._auth_plugin}' cannot be used as the default authentication "
                "plugin"
            )

        # Set converter class
        try:
            self.converter_class = config["converter_class"]
        except KeyError:
            pass  # Using default converter class
        except TypeError as err:
            raise AttributeError(
                "Converter class should be a subclass of "
                "conversion.MySQLConverterBase"
            ) from err

        # Compatible configuration with other drivers
        compat_map = [
            # (<other driver argument>,<translates to>)
            ("db", "database"),
            ("username", "user"),
            ("passwd", "password"),
            ("connect_timeout", "connection_timeout"),
            ("read_default_file", "option_files"),
        ]
        for compat, translate in compat_map:
            try:
                if translate not in config:
                    config[translate] = config[compat]
                del config[compat]
            except KeyError:
                pass  # Missing compat argument is OK

        # Configure login information
        if "user" in config or "password" in config:
            try:
                user = config["user"]
                del config["user"]
            except KeyError:
                user = self._user
            try:
                password = config["password"]
                del config["password"]
            except KeyError:
                password = self._password
            self.set_login(user, password)

        # Configure host information
        if "host" in config and config["host"]:
            self._host = config["host"]

        # Check network locations
        try:
            self._port = int(config["port"])
            del config["port"]
        except KeyError:
            pass  # Missing port argument is OK
        except ValueError as err:
            raise InterfaceError("TCP/IP port number should be an integer") from err

        if "ssl_disabled" in config:
            self._ssl_disabled = config.pop("ssl_disabled")

        # If an init_command is set, keep it, so we can execute it in _post_connection
        if "init_command" in config:
            self._init_command = config["init_command"]
            del config["init_command"]

        # Other configuration
        set_ssl_flag = False
        for key, value in config.items():
            try:
                DEFAULT_CONFIGURATION[key]
            except KeyError:
                raise AttributeError(f"Unsupported argument '{key}'") from None
            # SSL Configuration
            if key.startswith("ssl_"):
                set_ssl_flag = True
                self._ssl.update({key.replace("ssl_", ""): value})
            elif key.startswith("tls_"):
                set_ssl_flag = True
                self._ssl.update({key: value})
            else:
                attribute = "_" + key
                try:
                    setattr(self, attribute, value.strip())
                except AttributeError:
                    setattr(self, attribute, value)

        # Disable SSL for unix socket connections
        if self._unix_socket and os.name == "posix":
            self._ssl_disabled = True

        if self._ssl_disabled:
            if self._auth_plugin == "mysql_clear_password":
                raise InterfaceError(
                    "Clear password authentication is not supported over insecure channels"
                )
            if self._auth_plugin == "authentication_openid_connect_client":
                raise InterfaceError(
                    "OpenID Connect authentication is not supported over insecure channels"
                )

        if set_ssl_flag:
            if "verify_cert" not in self._ssl:
                self._ssl["verify_cert"] = DEFAULT_CONFIGURATION["ssl_verify_cert"]
            if "verify_identity" not in self._ssl:
                self._ssl["verify_identity"] = DEFAULT_CONFIGURATION[
                    "ssl_verify_identity"
                ]
            # Make sure both ssl_key/ssl_cert are set, or neither (XOR)
            if "ca" not in self._ssl or self._ssl["ca"] is None:
                self._ssl["ca"] = ""
            if bool("key" in self._ssl) != bool("cert" in self._ssl):
                raise AttributeError(
                    "ssl_key and ssl_cert need to be both specified, or neither"
                )
            # Make sure key/cert are set to None
            if not set(("key", "cert")) <= set(self._ssl):
                self._ssl["key"] = None
                self._ssl["cert"] = None
            elif (self._ssl["key"] is None) != (self._ssl["cert"] is None):
                raise AttributeError(
                    "ssl_key and ssl_cert need to be both set, or neither"
                )
            if self._ssl.get("tls_versions") is not None:
                self._validate_tls_versions()

            if self._ssl.get("tls_ciphersuites") is not None:
                self._validate_tls_ciphersuites()

        if self._conn_attrs is None:
            self._conn_attrs = {}
        elif not isinstance(self._conn_attrs, dict):
            raise InterfaceError("conn_attrs must be of type dict")
        else:
            for attr_name, attr_value in self._conn_attrs.items():
                if attr_name in CONN_ATTRS_DN:
                    continue
                # Validate name type
                if not isinstance(attr_name, str):
                    raise InterfaceError(
                        "Attribute name should be a string, found: "
                        f"'{attr_name}' in '{self._conn_attrs}'"
                    )
                # Validate attribute name limit 32 characters
                if len(attr_name) > 32:
                    raise InterfaceError(
                        f"Attribute name '{attr_name}' exceeds 32 characters limit size"
                    )
                # Validate names in connection attributes cannot start with "_"
                if attr_name.startswith("_"):
                    raise InterfaceError(
                        "Key names in connection attributes cannot start with "
                        "'_', found: '{attr_name}'"
                    )
                # Validate value type
                if not isinstance(attr_value, str):
                    raise InterfaceError(
                        f"Attribute '{attr_name}' value: '{attr_value}' must "
                        "be a string type"
                    )
                # Validate attribute value limit 1024 characters
                if len(attr_value) > 1024:
                    raise InterfaceError(
                        f"Attribute '{attr_name}' value: '{attr_value}' "
                        "exceeds 1024 characters limit size"
                    )

        if self._client_flags & ClientFlag.CONNECT_ARGS:
            self._add_default_conn_attrs()

        if "kerberos_auth_mode" in config and config["kerberos_auth_mode"] is not None:
            if not isinstance(config["kerberos_auth_mode"], str):
                raise InterfaceError("'kerberos_auth_mode' must be of type str")
            kerberos_auth_mode = config["kerberos_auth_mode"].lower()
            if kerberos_auth_mode == "sspi":
                if os.name != "nt":
                    raise InterfaceError(
                        "'kerberos_auth_mode=SSPI' is only available on Windows"
                    )
                self._auth_plugin_class = "MySQLSSPIKerberosAuthPlugin"
            elif kerberos_auth_mode == "gssapi":
                self._auth_plugin_class = "MySQLKerberosAuthPlugin"
            else:
                raise InterfaceError(
                    "Invalid 'kerberos_auth_mode' mode. Please use 'SSPI' or 'GSSAPI'"
                )

        if (
            "krb_service_principal" in config
            and config["krb_service_principal"] is not None
        ):
            self._krb_service_principal = config["krb_service_principal"]
            if not isinstance(self._krb_service_principal, str):
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(error="is not a string")
                )
            if self._krb_service_principal == "":
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(
                        error="can not be an empty string"
                    )
                )
            if "/" not in self._krb_service_principal:
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(error="is incorrectly formatted")
                )

        if self._webauthn_callback:
            self._validate_callable("webauth_callback", self._webauthn_callback, 1)

        if config.get("openid_token_file") is not None:
            self._openid_token_file = config["openid_token_file"]
            if not isinstance(self._openid_token_file, str):
                raise InterfaceError(
                    OPENID_TOKEN_FILE_ERROR.format(error="is not a string")
                )
            if self._openid_token_file == "":
                raise InterfaceError(
                    OPENID_TOKEN_FILE_ERROR.format(error="cannot be an empty string")
                )
            if not os.path.exists(self._openid_token_file):
                raise InterfaceError(
                    f"The path '{self._openid_token_file}' provided via 'openid_token_file' "
                    "does not exist"
                )

        if config.get("read_timeout") is not None:
            self._read_timeout = config["read_timeout"]
            if not isinstance(self._read_timeout, int) or self._read_timeout < 0:
                raise InterfaceError("Option read_timeout must be a positive integer")
        if config.get("write_timeout") is not None:
            self._write_timeout = config["write_timeout"]
            if not isinstance(self._write_timeout, int) or self._write_timeout < 0:
                raise InterfaceError("Option write_timeout must be a positive integer")