def _wrap_socket_with_ssl()

in redis/connection.py [0:0]


    def _wrap_socket_with_ssl(self, sock):
        """
        Wraps the socket with SSL support.

        Args:
            sock: The plain socket to wrap with SSL.

        Returns:
            An SSL wrapped socket.
        """
        context = ssl.create_default_context()
        context.check_hostname = self.check_hostname
        context.verify_mode = self.cert_reqs
        if self.certfile or self.keyfile:
            context.load_cert_chain(
                certfile=self.certfile,
                keyfile=self.keyfile,
                password=self.certificate_password,
            )
        if (
            self.ca_certs is not None
            or self.ca_path is not None
            or self.ca_data is not None
        ):
            context.load_verify_locations(
                cafile=self.ca_certs, capath=self.ca_path, cadata=self.ca_data
            )
        if self.ssl_min_version is not None:
            context.minimum_version = self.ssl_min_version
        if self.ssl_ciphers:
            context.set_ciphers(self.ssl_ciphers)
        if self.ssl_validate_ocsp is True and CRYPTOGRAPHY_AVAILABLE is False:
            raise RedisError("cryptography is not installed.")

        if self.ssl_validate_ocsp_stapled and self.ssl_validate_ocsp:
            raise RedisError(
                "Either an OCSP staple or pure OCSP connection must be validated "
                "- not both."
            )

        sslsock = context.wrap_socket(sock, server_hostname=self.host)

        # validation for the stapled case
        if self.ssl_validate_ocsp_stapled:
            import OpenSSL

            from .ocsp import ocsp_staple_verifier

            # if a context is provided use it - otherwise, a basic context
            if self.ssl_ocsp_context is None:
                staple_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
                staple_ctx.use_certificate_file(self.certfile)
                staple_ctx.use_privatekey_file(self.keyfile)
            else:
                staple_ctx = self.ssl_ocsp_context

            staple_ctx.set_ocsp_client_callback(
                ocsp_staple_verifier, self.ssl_ocsp_expected_cert
            )

            #  need another socket
            con = OpenSSL.SSL.Connection(staple_ctx, socket.socket())
            con.request_ocsp()
            con.connect((self.host, self.port))
            con.do_handshake()
            con.shutdown()
            return sslsock

        # pure ocsp validation
        if self.ssl_validate_ocsp is True and CRYPTOGRAPHY_AVAILABLE:
            from .ocsp import OCSPVerifier

            o = OCSPVerifier(sslsock, self.host, self.port, self.ca_certs)
            if o.is_valid():
                return sslsock
            else:
                raise ConnectionError("ocsp validation error")
        return sslsock