def _validate_conn()

in elastic_transport/_node/_urllib3_chain_certs.py [0:0]


    def _validate_conn(self, conn: HTTPSConnection) -> None:  # type: ignore[override]
        """
        Called right before a request is made, after the socket is created.
        """
        super(HTTPSConnectionPool, self)._validate_conn(conn)

        if self._elastic_assert_fingerprint:
            hash_func = _HASHES_BY_LENGTH[len(self._elastic_assert_fingerprint)]
            assert_fingerprint = unhexlify(
                self._elastic_assert_fingerprint.lower()
                .replace(":", "")
                .encode("ascii")
            )

            fingerprints: List[bytes]
            try:
                if sys.version_info >= (3, 13):
                    fingerprints = [
                        hash_func(cert).digest()
                        for cert in conn.sock.get_verified_chain()  # type: ignore
                    ]
                else:
                    # 'get_verified_chain()' and 'Certificate.public_bytes()' are private APIs
                    # in CPython 3.10. They're not documented anywhere yet but seem to work
                    # and we need them for Security on by Default so... onwards we go!
                    # See: https://github.com/python/cpython/pull/25467
                    fingerprints = [
                        hash_func(cert.public_bytes(_ENCODING_DER)).digest()
                        for cert in conn.sock._sslobj.get_verified_chain()  # type: ignore[union-attr]
                    ]
            except RERAISE_EXCEPTIONS:  # pragma: nocover
                raise
            # Because these are private APIs we are super careful here
            # so that if anything "goes wrong" we fallback on the old behavior.
            except Exception:  # pragma: nocover
                fingerprints = []

            # Only add the peercert in front of the chain if it's not there for some reason.
            # This is to make sure old behavior of 'ssl_assert_fingerprint' still works.
            peercert_fingerprint = hash_func(conn.sock.getpeercert(True)).digest()  # type: ignore[union-attr]
            if peercert_fingerprint not in fingerprints:  # pragma: nocover
                fingerprints.insert(0, peercert_fingerprint)

            # If any match then that's a success! We always run them
            # all through though because of constant time concerns.
            success = False
            for fingerprint in fingerprints:
                success |= compare_digest(fingerprint, assert_fingerprint)

            # Give users all the fingerprints we checked against in
            # order of peer -> root CA.
            if not success:
                raise urllib3.exceptions.SSLError(
                    'Fingerprints did not match. Expected "{0}", got "{1}".'.format(
                        self._elastic_assert_fingerprint,
                        '", "'.join([x.decode() for x in map(hexlify, fingerprints)]),
                    )
                )
            conn.is_verified = success