def _validate_connection_options()

in mysql-connector-python/lib/mysql/connector/aio/abstracts.py [0:0]


    def _validate_connection_options(self) -> None:
        """Validate connection options."""
        if self._user:
            try:
                self._user = self._user.strip()
            except AttributeError as err:
                raise AttributeError("'user' must be a string") from err

        if self._compress:
            self.client_flags = [ClientFlag.COMPRESS]

        if self._allow_local_infile_in_path:
            infile_in_path = os.path.abspath(self._allow_local_infile_in_path)
            if (
                infile_in_path
                and os.path.exists(infile_in_path)
                and not os.path.isdir(infile_in_path)
                or os.path.islink(infile_in_path)
            ):
                raise AttributeError("allow_local_infile_in_path must be a directory")
        if self._allow_local_infile or self._allow_local_infile_in_path:
            self.client_flags = [ClientFlag.LOCAL_FILES]
        else:
            self.client_flags = [-ClientFlag.LOCAL_FILES]

        # Disallow the usage of some default authentication plugins
        if self._auth_plugin == "authentication_webauthn_client":
            raise InterfaceError(
                f"'{self._auth_plugin}' cannot be used as the default authentication "
                "plugin"
            )

        # Disable SSL for unix socket connections
        if self._unix_socket and os.name == "posix":
            self._ssl_disabled = True

        if self._ssl_disabled:
            if self._auth_plugin == "mysql_clear_password":
                raise InterfaceError(
                    "Clear password authentication is not supported over insecure "
                    " channels"
                )
            if self._auth_plugin == "authentication_openid_connect_client":
                raise InterfaceError(
                    "OpenID Connect authentication is not supported over insecure channels"
                )

        if not isinstance(self._port, int):
            raise InterfaceError("TCP/IP port number should be an integer")

        if any([self._ssl_ca, self._ssl_cert, self._ssl_key]):
            # Make sure both ssl_key/ssl_cert are set, or neither (XOR)
            if not all([self._ssl_key, self._ssl_cert]):
                raise AttributeError(
                    "ssl_key and ssl_cert need to be both specified, or neither"
                )

            if (self._ssl_key is None) != (self._ssl_cert is None):
                raise AttributeError(
                    "ssl_key and ssl_cert need to be both set, or neither"
                )
            if self._tls_versions is not None:
                self._validate_tls_versions()

            if self._tls_ciphersuites is not None:
                self._validate_tls_ciphersuites()

        if not isinstance(self._connection_attrs, dict):
            raise InterfaceError("conn_attrs must be of type dict")

        for attr_name, attr_value in self._connection_attrs.items():
            if attr_name in CONN_ATTRS_DN:
                continue
            # Validate name type
            if not isinstance(attr_name, str):
                raise InterfaceError(
                    "Attribute name should be a string, found: "
                    f"'{attr_name}' in '{self._connection_attrs}'"
                )
            # Validate attribute name limit 32 characters
            if len(attr_name) > 32:
                raise InterfaceError(
                    f"Attribute name '{attr_name}' exceeds 32 characters limit size"
                )
            # Validate names in connection attributes cannot start with "_"
            if attr_name.startswith("_"):
                raise InterfaceError(
                    "Key names in connection attributes cannot start with "
                    "'_', found: '{attr_name}'"
                )
            # Validate value type
            if not isinstance(attr_value, str):
                raise InterfaceError(
                    f"Attribute '{attr_name}' value: '{attr_value}' must "
                    "be a string type"
                )
            # Validate attribute value limit 1024 characters
            if len(attr_value) > 1024:
                raise InterfaceError(
                    f"Attribute '{attr_name}' value: '{attr_value}' "
                    "exceeds 1024 characters limit size"
                )

        if self._client_flags & ClientFlag.CONNECT_ARGS:
            self._add_default_conn_attrs()

        if self._kerberos_auth_mode:
            if not isinstance(self._kerberos_auth_mode, str):
                raise InterfaceError("'kerberos_auth_mode' must be of type str")
            kerberos_auth_mode = self._kerberos_auth_mode.lower()
            if kerberos_auth_mode == "sspi":
                if os.name != "nt":
                    raise InterfaceError(
                        "'kerberos_auth_mode=SSPI' is only available on Windows"
                    )
                self._auth_plugin_class = "MySQLSSPIKerberosAuthPlugin"
            elif kerberos_auth_mode == "gssapi":
                self._auth_plugin_class = "MySQLKerberosAuthPlugin"
            else:
                raise InterfaceError(
                    "Invalid 'kerberos_auth_mode' mode. Please use 'SSPI' or 'GSSAPI'"
                )

        if self._krb_service_principal:
            if not isinstance(self._krb_service_principal, str):
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(error="is not a string")
                )
            if self._krb_service_principal == "":
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(
                        error="can not be an empty string"
                    )
                )
            if "/" not in self._krb_service_principal:
                raise InterfaceError(
                    KRB_SERVICE_PRINCIPAL_ERROR.format(error="is incorrectly formatted")
                )

        if self._webauthn_callback:
            self._validate_callable("webauth_callback", self._webauthn_callback, 1)

        if self._openid_token_file:
            if not isinstance(self._openid_token_file, str):
                raise InterfaceError(
                    OPENID_TOKEN_FILE_ERROR.format(error="is not a string")
                )
            if self._openid_token_file == "":
                raise InterfaceError(
                    OPENID_TOKEN_FILE_ERROR.format(error="cannot be an empty string")
                )
            if not os.path.exists(self._openid_token_file):
                raise InterfaceError(
                    f"The path '{self._openid_token_file}' provided via 'openid_token_file' "
                    "does not exist"
                )
        if self._read_timeout is not None:
            if not isinstance(self._read_timeout, int) or self._read_timeout < 0:
                raise InterfaceError("Option read_timeout must be a positive integer")
        if self._write_timeout is not None:
            if not isinstance(self._write_timeout, int) or self._write_timeout < 0:
                raise InterfaceError("Option write_timeout must be a positive integer")