in mysql-connector-python/lib/mysql/connector/abstracts.py [0:0]
def config(self, **kwargs: Any) -> None:
"""Configures the MySQL Connection.
This method allows you to configure the `MySQLConnection`
instance after it has been instantiated.
Args:
**kwargs: For a complete list of possible arguments, see [1].
Raises:
AttributeError: When provided unsupported connection arguments.
InterfaceError: When the provided connection argument is invalid.
References:
[1]: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
"""
# opentelemetry related
self._span = kwargs.pop(OPTION_CNX_SPAN, None)
self._tracer = kwargs.pop(OPTION_CNX_TRACER, None)
config = kwargs.copy()
if "dsn" in config:
raise NotSupportedError("Data source name is not supported")
# Read option files
config = read_option_files(**config)
# Configure how we handle MySQL warnings
try:
self.get_warnings = config["get_warnings"]
del config["get_warnings"]
except KeyError:
pass # Leave what was set or default
try:
self.raise_on_warnings = config["raise_on_warnings"]
del config["raise_on_warnings"]
except KeyError:
pass # Leave what was set or default
# Configure client flags
try:
default = ClientFlag.get_default()
self.client_flags = config["client_flags"] or default
del config["client_flags"]
except KeyError:
pass # Missing client_flags-argument is OK
try:
if config["compress"]:
self._compress = True
self.client_flags = [ClientFlag.COMPRESS]
except KeyError:
pass # Missing compress argument is OK
self._allow_local_infile = config.get(
"allow_local_infile", DEFAULT_CONFIGURATION["allow_local_infile"]
)
self._allow_local_infile_in_path = config.get(
"allow_local_infile_in_path",
DEFAULT_CONFIGURATION["allow_local_infile_in_path"],
)
infile_in_path = None
if self._allow_local_infile_in_path:
infile_in_path = os.path.abspath(self._allow_local_infile_in_path)
if (
infile_in_path
and os.path.exists(infile_in_path)
and not os.path.isdir(infile_in_path)
or os.path.islink(infile_in_path)
):
raise AttributeError("allow_local_infile_in_path must be a directory")
if self._allow_local_infile or self._allow_local_infile_in_path:
self.client_flags = [ClientFlag.LOCAL_FILES]
else:
self.client_flags = [-ClientFlag.LOCAL_FILES]
try:
if not config["consume_results"]:
self._consume_results = False
else:
self._consume_results = True
except KeyError:
self._consume_results = False
# Configure auth_plugin
try:
self._auth_plugin = config["auth_plugin"]
del config["auth_plugin"]
except KeyError:
self._auth_plugin = ""
# Disallow the usage of some default authentication plugins
if self._auth_plugin == "authentication_webauthn_client":
raise InterfaceError(
f"'{self._auth_plugin}' cannot be used as the default authentication "
"plugin"
)
# Set converter class
try:
self.converter_class = config["converter_class"]
except KeyError:
pass # Using default converter class
except TypeError as err:
raise AttributeError(
"Converter class should be a subclass of "
"conversion.MySQLConverterBase"
) from err
# Compatible configuration with other drivers
compat_map = [
# (<other driver argument>,<translates to>)
("db", "database"),
("username", "user"),
("passwd", "password"),
("connect_timeout", "connection_timeout"),
("read_default_file", "option_files"),
]
for compat, translate in compat_map:
try:
if translate not in config:
config[translate] = config[compat]
del config[compat]
except KeyError:
pass # Missing compat argument is OK
# Configure login information
if "user" in config or "password" in config:
try:
user = config["user"]
del config["user"]
except KeyError:
user = self._user
try:
password = config["password"]
del config["password"]
except KeyError:
password = self._password
self.set_login(user, password)
# Configure host information
if "host" in config and config["host"]:
self._host = config["host"]
# Check network locations
try:
self._port = int(config["port"])
del config["port"]
except KeyError:
pass # Missing port argument is OK
except ValueError as err:
raise InterfaceError("TCP/IP port number should be an integer") from err
if "ssl_disabled" in config:
self._ssl_disabled = config.pop("ssl_disabled")
# If an init_command is set, keep it, so we can execute it in _post_connection
if "init_command" in config:
self._init_command = config["init_command"]
del config["init_command"]
# Other configuration
set_ssl_flag = False
for key, value in config.items():
try:
DEFAULT_CONFIGURATION[key]
except KeyError:
raise AttributeError(f"Unsupported argument '{key}'") from None
# SSL Configuration
if key.startswith("ssl_"):
set_ssl_flag = True
self._ssl.update({key.replace("ssl_", ""): value})
elif key.startswith("tls_"):
set_ssl_flag = True
self._ssl.update({key: value})
else:
attribute = "_" + key
try:
setattr(self, attribute, value.strip())
except AttributeError:
setattr(self, attribute, value)
# Disable SSL for unix socket connections
if self._unix_socket and os.name == "posix":
self._ssl_disabled = True
if self._ssl_disabled:
if self._auth_plugin == "mysql_clear_password":
raise InterfaceError(
"Clear password authentication is not supported over insecure channels"
)
if self._auth_plugin == "authentication_openid_connect_client":
raise InterfaceError(
"OpenID Connect authentication is not supported over insecure channels"
)
if set_ssl_flag:
if "verify_cert" not in self._ssl:
self._ssl["verify_cert"] = DEFAULT_CONFIGURATION["ssl_verify_cert"]
if "verify_identity" not in self._ssl:
self._ssl["verify_identity"] = DEFAULT_CONFIGURATION[
"ssl_verify_identity"
]
# Make sure both ssl_key/ssl_cert are set, or neither (XOR)
if "ca" not in self._ssl or self._ssl["ca"] is None:
self._ssl["ca"] = ""
if bool("key" in self._ssl) != bool("cert" in self._ssl):
raise AttributeError(
"ssl_key and ssl_cert need to be both specified, or neither"
)
# Make sure key/cert are set to None
if not set(("key", "cert")) <= set(self._ssl):
self._ssl["key"] = None
self._ssl["cert"] = None
elif (self._ssl["key"] is None) != (self._ssl["cert"] is None):
raise AttributeError(
"ssl_key and ssl_cert need to be both set, or neither"
)
if self._ssl.get("tls_versions") is not None:
self._validate_tls_versions()
if self._ssl.get("tls_ciphersuites") is not None:
self._validate_tls_ciphersuites()
if self._conn_attrs is None:
self._conn_attrs = {}
elif not isinstance(self._conn_attrs, dict):
raise InterfaceError("conn_attrs must be of type dict")
else:
for attr_name, attr_value in self._conn_attrs.items():
if attr_name in CONN_ATTRS_DN:
continue
# Validate name type
if not isinstance(attr_name, str):
raise InterfaceError(
"Attribute name should be a string, found: "
f"'{attr_name}' in '{self._conn_attrs}'"
)
# Validate attribute name limit 32 characters
if len(attr_name) > 32:
raise InterfaceError(
f"Attribute name '{attr_name}' exceeds 32 characters limit size"
)
# Validate names in connection attributes cannot start with "_"
if attr_name.startswith("_"):
raise InterfaceError(
"Key names in connection attributes cannot start with "
"'_', found: '{attr_name}'"
)
# Validate value type
if not isinstance(attr_value, str):
raise InterfaceError(
f"Attribute '{attr_name}' value: '{attr_value}' must "
"be a string type"
)
# Validate attribute value limit 1024 characters
if len(attr_value) > 1024:
raise InterfaceError(
f"Attribute '{attr_name}' value: '{attr_value}' "
"exceeds 1024 characters limit size"
)
if self._client_flags & ClientFlag.CONNECT_ARGS:
self._add_default_conn_attrs()
if "kerberos_auth_mode" in config and config["kerberos_auth_mode"] is not None:
if not isinstance(config["kerberos_auth_mode"], str):
raise InterfaceError("'kerberos_auth_mode' must be of type str")
kerberos_auth_mode = config["kerberos_auth_mode"].lower()
if kerberos_auth_mode == "sspi":
if os.name != "nt":
raise InterfaceError(
"'kerberos_auth_mode=SSPI' is only available on Windows"
)
self._auth_plugin_class = "MySQLSSPIKerberosAuthPlugin"
elif kerberos_auth_mode == "gssapi":
self._auth_plugin_class = "MySQLKerberosAuthPlugin"
else:
raise InterfaceError(
"Invalid 'kerberos_auth_mode' mode. Please use 'SSPI' or 'GSSAPI'"
)
if (
"krb_service_principal" in config
and config["krb_service_principal"] is not None
):
self._krb_service_principal = config["krb_service_principal"]
if not isinstance(self._krb_service_principal, str):
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(error="is not a string")
)
if self._krb_service_principal == "":
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(
error="can not be an empty string"
)
)
if "/" not in self._krb_service_principal:
raise InterfaceError(
KRB_SERVICE_PRINCIPAL_ERROR.format(error="is incorrectly formatted")
)
if self._webauthn_callback:
self._validate_callable("webauth_callback", self._webauthn_callback, 1)
if config.get("openid_token_file") is not None:
self._openid_token_file = config["openid_token_file"]
if not isinstance(self._openid_token_file, str):
raise InterfaceError(
OPENID_TOKEN_FILE_ERROR.format(error="is not a string")
)
if self._openid_token_file == "":
raise InterfaceError(
OPENID_TOKEN_FILE_ERROR.format(error="cannot be an empty string")
)
if not os.path.exists(self._openid_token_file):
raise InterfaceError(
f"The path '{self._openid_token_file}' provided via 'openid_token_file' "
"does not exist"
)
if config.get("read_timeout") is not None:
self._read_timeout = config["read_timeout"]
if not isinstance(self._read_timeout, int) or self._read_timeout < 0:
raise InterfaceError("Option read_timeout must be a positive integer")
if config.get("write_timeout") is not None:
self._write_timeout = config["write_timeout"]
if not isinstance(self._write_timeout, int) or self._write_timeout < 0:
raise InterfaceError("Option write_timeout must be a positive integer")