elastic_transport/_node/_urllib3_chain_certs.py (85 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import hashlib import sys from binascii import hexlify, unhexlify from hmac import compare_digest from typing import Any, List, Optional import _ssl # type: ignore import urllib3 import urllib3.connection from ._base import RERAISE_EXCEPTIONS if sys.version_info < (3, 10) or sys.implementation.name != "cpython": raise ImportError("Only supported on CPython 3.10+") _ENCODING_DER: int = _ssl.ENCODING_DER _HASHES_BY_LENGTH = {32: hashlib.md5, 40: hashlib.sha1, 64: hashlib.sha256} __all__ = ["HTTPSConnectionPool"] class HTTPSConnection(urllib3.connection.HTTPSConnection): def __init__(self, *args: Any, **kwargs: Any) -> None: self._elastic_assert_fingerprint: Optional[str] = None super().__init__(*args, **kwargs) def connect(self) -> None: super().connect() # Hack to prevent a warning within HTTPSConnectionPool._validate_conn() if self._elastic_assert_fingerprint: self.is_verified = True class HTTPSConnectionPool(urllib3.HTTPSConnectionPool): ConnectionCls = HTTPSConnection """HTTPSConnectionPool implementation which supports ``assert_fingerprint`` on certificates within the chain instead of only the leaf cert using private APIs in CPython 3.10+ """ def __init__( self, *args: Any, assert_fingerprint: Optional[str] = None, **kwargs: Any ) -> None: self._elastic_assert_fingerprint = ( assert_fingerprint.replace(":", "").lower() if assert_fingerprint else None ) # Complain about fingerprint length earlier than urllib3 does. if ( self._elastic_assert_fingerprint and len(self._elastic_assert_fingerprint) not in _HASHES_BY_LENGTH ): valid_lengths = "', '".join(map(str, sorted(_HASHES_BY_LENGTH.keys()))) raise ValueError( f"Fingerprint of invalid length '{len(self._elastic_assert_fingerprint)}'" f", should be one of '{valid_lengths}'" ) if self._elastic_assert_fingerprint: # Skip fingerprinting by urllib3 as we'll do it ourselves kwargs["assert_fingerprint"] = None super().__init__(*args, **kwargs) def _new_conn(self) -> HTTPSConnection: """ Return a fresh :class:`urllib3.connection.HTTPSConnection`. """ conn: HTTPSConnection = super()._new_conn() # type: ignore[assignment] # Tell our custom connection if we'll assert fingerprint ourselves conn._elastic_assert_fingerprint = self._elastic_assert_fingerprint return conn def _validate_conn(self, conn: HTTPSConnection) -> None: # type: ignore[override] """ Called right before a request is made, after the socket is created. """ super(HTTPSConnectionPool, self)._validate_conn(conn) if self._elastic_assert_fingerprint: hash_func = _HASHES_BY_LENGTH[len(self._elastic_assert_fingerprint)] assert_fingerprint = unhexlify( self._elastic_assert_fingerprint.lower() .replace(":", "") .encode("ascii") ) fingerprints: List[bytes] try: if sys.version_info >= (3, 13): fingerprints = [ hash_func(cert).digest() for cert in conn.sock.get_verified_chain() # type: ignore ] else: # 'get_verified_chain()' and 'Certificate.public_bytes()' are private APIs # in CPython 3.10. They're not documented anywhere yet but seem to work # and we need them for Security on by Default so... onwards we go! # See: https://github.com/python/cpython/pull/25467 fingerprints = [ hash_func(cert.public_bytes(_ENCODING_DER)).digest() for cert in conn.sock._sslobj.get_verified_chain() # type: ignore[union-attr] ] except RERAISE_EXCEPTIONS: # pragma: nocover raise # Because these are private APIs we are super careful here # so that if anything "goes wrong" we fallback on the old behavior. except Exception: # pragma: nocover fingerprints = [] # Only add the peercert in front of the chain if it's not there for some reason. # This is to make sure old behavior of 'ssl_assert_fingerprint' still works. peercert_fingerprint = hash_func(conn.sock.getpeercert(True)).digest() # type: ignore[union-attr] if peercert_fingerprint not in fingerprints: # pragma: nocover fingerprints.insert(0, peercert_fingerprint) # If any match then that's a success! We always run them # all through though because of constant time concerns. success = False for fingerprint in fingerprints: success |= compare_digest(fingerprint, assert_fingerprint) # Give users all the fingerprints we checked against in # order of peer -> root CA. if not success: raise urllib3.exceptions.SSLError( 'Fingerprints did not match. Expected "{0}", got "{1}".'.format( self._elastic_assert_fingerprint, '", "'.join([x.decode() for x in map(hexlify, fingerprints)]), ) ) conn.is_verified = success