in mysql-connector-python/lib/mysql/connector/connection_cext.py [0:0]
def _open_connection(self) -> None:
charset_name = self._character_set.get_info(self._charset_id)[0]
# pylint: disable=c-extension-no-member
self._cmysql = _mysql_connector.MySQL(
buffered=self._buffered,
raw=self._raw,
charset_name=charset_name,
connection_timeout=(self._connection_timeout or 0),
use_unicode=self.use_unicode,
auth_plugin=self._auth_plugin,
plugin_dir=self._plugin_dir,
)
# pylint: enable=c-extension-no-member
if not self.isset_client_flag(ClientFlag.CONNECT_ARGS):
self._conn_attrs = {}
cnx_kwargs = {
"host": self._host,
"user": self._user,
"password": self._password,
"password1": self._password1,
"password2": self._password2,
"password3": self._password3,
"database": self._database,
"port": self._port,
"client_flags": self.client_flags,
"unix_socket": self._unix_socket,
"compress": self._compress,
"ssl_disabled": True,
"conn_attrs": self._conn_attrs,
"local_infile": self._allow_local_infile,
"load_data_local_dir": self._allow_local_infile_in_path,
"oci_config_file": self._oci_config_file,
"oci_config_profile": self._oci_config_profile,
"webauthn_callback": (
import_object(self._webauthn_callback)
if isinstance(self._webauthn_callback, str)
else self._webauthn_callback
),
"openid_token_file": self._openid_token_file,
"read_timeout": self._read_timeout if self._read_timeout else 0,
"write_timeout": self._write_timeout if self._write_timeout else 0,
}
tls_versions = self._ssl.get("tls_versions")
if tls_versions is not None:
tls_versions.sort(reverse=True) # type: ignore[union-attr]
tls_versions = ",".join(tls_versions)
if self._ssl.get("tls_ciphersuites") is not None:
ssl_ciphersuites = (
self._ssl.get("tls_ciphersuites")[0] or None # type: ignore[index]
) # if it's the empty string, then use `None` instead
tls_ciphersuites = self._ssl.get("tls_ciphersuites")[ # type: ignore[index]
1
]
else:
ssl_ciphersuites = None
tls_ciphersuites = None
if (
tls_versions is not None
and "TLSv1.3" in tls_versions
and not tls_ciphersuites
):
tls_ciphersuites = "TLS_AES_256_GCM_SHA384"
if not self._ssl_disabled:
cnx_kwargs.update(
{
"ssl_ca": self._ssl.get("ca"),
"ssl_cert": self._ssl.get("cert"),
"ssl_key": self._ssl.get("key"),
"ssl_cipher_suites": ssl_ciphersuites,
"tls_versions": tls_versions,
"tls_cipher_suites": tls_ciphersuites,
"ssl_verify_cert": self._ssl.get("verify_cert") or False,
"ssl_verify_identity": self._ssl.get("verify_identity") or False,
"ssl_disabled": self._ssl_disabled,
}
)
if os.name == "nt" and self._auth_plugin_class == "MySQLKerberosAuthPlugin":
cnx_kwargs["use_kerberos_gssapi"] = True
try:
self._cmysql.connect(**cnx_kwargs)
self._cmysql.converter_str_fallback = self._converter_str_fallback
if self.converter:
self.converter.str_fallback = self._converter_str_fallback
except MySQLInterfaceError as err:
if hasattr(err, "errno"):
raise get_mysql_exception(
err.errno, msg=err.msg, sqlstate=err.sqlstate
) from err
raise InterfaceError(str(err)) from err
self._do_handshake()
if (
not self._ssl_disabled
and hasattr(self._cmysql, "get_ssl_cipher")
and callable(self._cmysql.get_ssl_cipher)
):
# Raise a deprecation warning if deprecated TLS version
# or cipher is being used.
# `get_ssl_cipher()` returns the name of the cipher being used.
cipher = self._cmysql.get_ssl_cipher()
for tls_version in set(self._ssl.get("tls_versions", [])):
warn_tls_version_deprecated(tls_version)
warn_ciphersuites_deprecated(cipher, tls_version)