def _open_connection()

in mysql-connector-python/lib/mysql/connector/connection_cext.py [0:0]


    def _open_connection(self) -> None:
        charset_name = self._character_set.get_info(self._charset_id)[0]
        # pylint: disable=c-extension-no-member
        self._cmysql = _mysql_connector.MySQL(
            buffered=self._buffered,
            raw=self._raw,
            charset_name=charset_name,
            connection_timeout=(self._connection_timeout or 0),
            use_unicode=self.use_unicode,
            auth_plugin=self._auth_plugin,
            plugin_dir=self._plugin_dir,
        )
        # pylint: enable=c-extension-no-member
        if not self.isset_client_flag(ClientFlag.CONNECT_ARGS):
            self._conn_attrs = {}

        cnx_kwargs = {
            "host": self._host,
            "user": self._user,
            "password": self._password,
            "password1": self._password1,
            "password2": self._password2,
            "password3": self._password3,
            "database": self._database,
            "port": self._port,
            "client_flags": self.client_flags,
            "unix_socket": self._unix_socket,
            "compress": self._compress,
            "ssl_disabled": True,
            "conn_attrs": self._conn_attrs,
            "local_infile": self._allow_local_infile,
            "load_data_local_dir": self._allow_local_infile_in_path,
            "oci_config_file": self._oci_config_file,
            "oci_config_profile": self._oci_config_profile,
            "webauthn_callback": (
                import_object(self._webauthn_callback)
                if isinstance(self._webauthn_callback, str)
                else self._webauthn_callback
            ),
            "openid_token_file": self._openid_token_file,
            "read_timeout": self._read_timeout if self._read_timeout else 0,
            "write_timeout": self._write_timeout if self._write_timeout else 0,
        }

        tls_versions = self._ssl.get("tls_versions")
        if tls_versions is not None:
            tls_versions.sort(reverse=True)  # type: ignore[union-attr]
            tls_versions = ",".join(tls_versions)
        if self._ssl.get("tls_ciphersuites") is not None:
            ssl_ciphersuites = (
                self._ssl.get("tls_ciphersuites")[0] or None  # type: ignore[index]
            )  # if it's the empty string, then use `None` instead
            tls_ciphersuites = self._ssl.get("tls_ciphersuites")[  # type: ignore[index]
                1
            ]
        else:
            ssl_ciphersuites = None
            tls_ciphersuites = None
        if (
            tls_versions is not None
            and "TLSv1.3" in tls_versions
            and not tls_ciphersuites
        ):
            tls_ciphersuites = "TLS_AES_256_GCM_SHA384"
        if not self._ssl_disabled:
            cnx_kwargs.update(
                {
                    "ssl_ca": self._ssl.get("ca"),
                    "ssl_cert": self._ssl.get("cert"),
                    "ssl_key": self._ssl.get("key"),
                    "ssl_cipher_suites": ssl_ciphersuites,
                    "tls_versions": tls_versions,
                    "tls_cipher_suites": tls_ciphersuites,
                    "ssl_verify_cert": self._ssl.get("verify_cert") or False,
                    "ssl_verify_identity": self._ssl.get("verify_identity") or False,
                    "ssl_disabled": self._ssl_disabled,
                }
            )

        if os.name == "nt" and self._auth_plugin_class == "MySQLKerberosAuthPlugin":
            cnx_kwargs["use_kerberos_gssapi"] = True

        try:
            self._cmysql.connect(**cnx_kwargs)
            self._cmysql.converter_str_fallback = self._converter_str_fallback
            if self.converter:
                self.converter.str_fallback = self._converter_str_fallback
        except MySQLInterfaceError as err:
            if hasattr(err, "errno"):
                raise get_mysql_exception(
                    err.errno, msg=err.msg, sqlstate=err.sqlstate
                ) from err
            raise InterfaceError(str(err)) from err

        self._do_handshake()

        if (
            not self._ssl_disabled
            and hasattr(self._cmysql, "get_ssl_cipher")
            and callable(self._cmysql.get_ssl_cipher)
        ):
            # Raise a deprecation warning if deprecated TLS version
            # or cipher is being used.

            # `get_ssl_cipher()` returns the name of the cipher being used.
            cipher = self._cmysql.get_ssl_cipher()
            for tls_version in set(self._ssl.get("tls_versions", [])):
                warn_tls_version_deprecated(tls_version)
                warn_ciphersuites_deprecated(cipher, tls_version)