def _open_connection()

in mysql-connector-python/lib/mysql/connector/connection.py [0:0]


    def _open_connection(self) -> None:
        """Open the connection to the MySQL server

        This method sets up and opens the connection to the MySQL server.

        Raises on errors.
        """
        # setting connection's read and write timeout to None temporarily
        # till connections is established successfully.
        stored_read_timeout = self.read_timeout
        stored_write_timeout = self.write_timeout
        self.read_timeout = self.write_timeout = None

        if self._auth_plugin == "authentication_kerberos_client" and not self._user:
            cls = get_auth_plugin(self._auth_plugin, self._auth_plugin_class)
            self._user = cls.get_user_from_credentials()

        self._protocol = MySQLProtocol()
        self._socket = self._get_connection()
        try:
            self._socket.open_connection()

            # do initial handshake
            self._do_handshake()

            # start authentication negotiation
            self._do_auth(
                self._user,
                self._password,
                self._database,
                self._client_flags,
                self._ssl,
                self._conn_attrs,
            )
            self.converter_class = self._converter_class

            if self._client_flags & ClientFlag.COMPRESS:
                # update the network layer accordingly
                self._socket.switch_to_compressed_mode()

            self._socket.set_connection_timeout(None)
        except Exception as err:
            # close socket
            self._socket.close_connection()
            if isinstance(err, (ReadTimeoutError, WriteTimeoutError)):
                raise ConnectionTimeoutError(
                    errno=err.errno,
                    msg=err.msg,
                ) from err
            raise err
        finally:
            # as the connection is established, set back the read
            # and write timeouts to the original value
            self.read_timeout = stored_read_timeout
            self.write_timeout = stored_write_timeout

        if (
            not self._ssl_disabled
            and hasattr(self._socket.sock, "cipher")
            and callable(self._socket.sock.cipher)
        ):
            # Raise a deprecation warning if deprecated TLS version
            # or cipher is being used.

            # `cipher()` returns a three-value tuple containing the name
            # of the cipher being used, the version of the SSL protocol
            # that defines its use, and the number of secret bits being used.
            cipher, tls_version, _ = self._socket.sock.cipher()
            warn_tls_version_deprecated(tls_version)
            warn_ciphersuites_deprecated(cipher, tls_version)