in mysql-connector-python/lib/mysql/connector/aio/abstracts.py [0:0]
def _validate_connection_options(self) -> None:
"""Validate connection options."""
if self._user:
try:
self._user = self._user.strip()
except AttributeError as err:
raise AttributeError("'user' must be a string") from err
if self._compress:
self.client_flags = [ClientFlag.COMPRESS]
if self._allow_local_infile_in_path:
infile_in_path = os.path.abspath(self._allow_local_infile_in_path)
if (
infile_in_path
and os.path.exists(infile_in_path)
and not os.path.isdir(infile_in_path)
or os.path.islink(infile_in_path)
):
raise AttributeError("allow_local_infile_in_path must be a directory")
if self._allow_local_infile or self._allow_local_infile_in_path:
self.client_flags = [ClientFlag.LOCAL_FILES]
else:
self.client_flags = [-ClientFlag.LOCAL_FILES]
# Disallow the usage of some default authentication plugins
if self._auth_plugin == "authentication_webauthn_client":
raise InterfaceError(
f"'{self._auth_plugin}' cannot be used as the default authentication "
"plugin"
)
# Disable SSL for unix socket connections
if self._unix_socket and os.name == "posix":
self._ssl_disabled = True
if self._ssl_disabled:
if self._auth_plugin == "mysql_clear_password":
raise InterfaceError(
"Clear password authentication is not supported over insecure "
" channels"
)
if self._auth_plugin == "authentication_openid_connect_client":
raise InterfaceError(
"OpenID Connect authentication is not supported over insecure channels"
)
if not isinstance(self._port, int):
raise InterfaceError("TCP/IP port number should be an integer")
if any([self._ssl_ca, self._ssl_cert, self._ssl_key]):
# Make sure both ssl_key/ssl_cert are set, or neither (XOR)
if not all([self._ssl_key, self._ssl_cert]):
raise AttributeError(
"ssl_key and ssl_cert need to be both specified, or neither"
)
if (self._ssl_key is None) != (self._ssl_cert is None):
raise AttributeError(
"ssl_key and ssl_cert need to be both set, or neither"
)
if self._tls_versions is not None:
self._validate_tls_versions()
if self._tls_ciphersuites is not None:
self._validate_tls_ciphersuites()
if not isinstance(self._connection_attrs, dict):
raise InterfaceError("conn_attrs must be of type dict")
for attr_name, attr_value in self._connection_attrs.items():
if attr_name in CONN_ATTRS_DN:
continue
# Validate name type
if not isinstance(attr_name, str):
raise InterfaceError(
"Attribute name should be a string, found: "
f"'{attr_name}' in '{self._connection_attrs}'"
)
# Validate attribute name limit 32 characters
if len(attr_name) > 32:
raise InterfaceError(
f"Attribute name '{attr_name}' exceeds 32 characters limit size"
)
# Validate names in connection attributes cannot start with "_"
if attr_name.startswith("_"):
raise InterfaceError(
"Key names in connection attributes cannot start with "
"'_', found: '{attr_name}'"
)
# Validate value type
if not isinstance(attr_value, str):
raise InterfaceError(
f"Attribute '{attr_name}' value: '{attr_value}' must "
"be a string type"
)
# Validate attribute value limit 1024 characters
if len(attr_value) > 1024:
raise InterfaceError(
f"Attribute '{attr_name}' value: '{attr_value}' "
"exceeds 1024 characters limit size"
)
if self._client_flags & ClientFlag.CONNECT_ARGS:
self._add_default_conn_attrs()
if self._kerberos_auth_mode:
if not isinstance(self._kerberos_auth_mode, str):
raise InterfaceError("'kerberos_auth_mode' must be of type str")
kerberos_auth_mode = self._kerberos_auth_mode.lower()
if kerberos_auth_mode == "sspi":
if os.name != "nt":
raise InterfaceError(
"'kerberos_auth_mode=SSPI' is only available on Windows"
)
self._auth_plugin_class = "MySQLSSPIKerberosAuthPlugin"
elif kerberos_auth_mode == "gssapi":
self._auth_plugin_class = "MySQLKerberosAuthPlugin"
else:
raise InterfaceError(
"Invalid 'kerberos_auth_mode' mode. Please use 'SSPI' or 'GSSAPI'"
)
if self._krb_service_principal:
if not isinstance(self._krb_service_principal, str):
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(error="is not a string")
)
if self._krb_service_principal == "":
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(
error="can not be an empty string"
)
)
if "/" not in self._krb_service_principal:
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(error="is incorrectly formatted")
)
if self._webauthn_callback:
self._validate_callable("webauth_callback", self._webauthn_callback, 1)
if self._openid_token_file:
if not isinstance(self._openid_token_file, str):
raise InterfaceError(
OPENID_TOKEN_FILE_ERROR.format(error="is not a string")
)
if self._openid_token_file == "":
raise InterfaceError(
OPENID_TOKEN_FILE_ERROR.format(error="cannot be an empty string")
)
if not os.path.exists(self._openid_token_file):
raise InterfaceError(
f"The path '{self._openid_token_file}' provided via 'openid_token_file' "
"does not exist"
)
if self._read_timeout is not None:
if not isinstance(self._read_timeout, int) or self._read_timeout < 0:
raise InterfaceError("Option read_timeout must be a positive integer")
if self._write_timeout is not None:
if not isinstance(self._write_timeout, int) or self._write_timeout < 0:
raise InterfaceError("Option write_timeout must be a positive integer")