in elastic_transport/_node/_urllib3_chain_certs.py [0:0]
def _validate_conn(self, conn: HTTPSConnection) -> None: # type: ignore[override]
"""
Called right before a request is made, after the socket is created.
"""
super(HTTPSConnectionPool, self)._validate_conn(conn)
if self._elastic_assert_fingerprint:
hash_func = _HASHES_BY_LENGTH[len(self._elastic_assert_fingerprint)]
assert_fingerprint = unhexlify(
self._elastic_assert_fingerprint.lower()
.replace(":", "")
.encode("ascii")
)
fingerprints: List[bytes]
try:
if sys.version_info >= (3, 13):
fingerprints = [
hash_func(cert).digest()
for cert in conn.sock.get_verified_chain() # type: ignore
]
else:
# 'get_verified_chain()' and 'Certificate.public_bytes()' are private APIs
# in CPython 3.10. They're not documented anywhere yet but seem to work
# and we need them for Security on by Default so... onwards we go!
# See: https://github.com/python/cpython/pull/25467
fingerprints = [
hash_func(cert.public_bytes(_ENCODING_DER)).digest()
for cert in conn.sock._sslobj.get_verified_chain() # type: ignore[union-attr]
]
except RERAISE_EXCEPTIONS: # pragma: nocover
raise
# Because these are private APIs we are super careful here
# so that if anything "goes wrong" we fallback on the old behavior.
except Exception: # pragma: nocover
fingerprints = []
# Only add the peercert in front of the chain if it's not there for some reason.
# This is to make sure old behavior of 'ssl_assert_fingerprint' still works.
peercert_fingerprint = hash_func(conn.sock.getpeercert(True)).digest() # type: ignore[union-attr]
if peercert_fingerprint not in fingerprints: # pragma: nocover
fingerprints.insert(0, peercert_fingerprint)
# If any match then that's a success! We always run them
# all through though because of constant time concerns.
success = False
for fingerprint in fingerprints:
success |= compare_digest(fingerprint, assert_fingerprint)
# Give users all the fingerprints we checked against in
# order of peer -> root CA.
if not success:
raise urllib3.exceptions.SSLError(
'Fingerprints did not match. Expected "{0}", got "{1}".'.format(
self._elastic_assert_fingerprint,
'", "'.join([x.decode() for x in map(hexlify, fingerprints)]),
)
)
conn.is_verified = success