elastic_transport/_node/_http_requests.py (203 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import gzip import ssl import time import warnings from typing import Any, Optional, Union import urllib3 from .._compat import warn_stacklevel from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError from .._models import ApiResponseMeta, HttpHeaders, NodeConfig from ..client_utils import DEFAULT, DefaultType, client_meta_version from ._base import ( BUILTIN_EXCEPTIONS, RERAISE_EXCEPTIONS, BaseNode, NodeApiResponse, ssl_context_from_node_config, ) try: import requests from requests.adapters import HTTPAdapter from requests.auth import AuthBase _REQUESTS_AVAILABLE = True _REQUESTS_META_VERSION = client_meta_version(requests.__version__) # Use our custom HTTPSConnectionPool for chain cert fingerprint support. try: from ._urllib3_chain_certs import HTTPSConnectionPool except (ImportError, AttributeError): HTTPSConnectionPool = urllib3.HTTPSConnectionPool # type: ignore[assignment,misc] class _ElasticHTTPAdapter(HTTPAdapter): def __init__(self, node_config: NodeConfig, **kwargs: Any) -> None: self._node_config = node_config super().__init__(**kwargs) def init_poolmanager( self, connections: Any, maxsize: int, block: bool = False, **pool_kwargs: Any, ) -> None: if self._node_config.scheme == "https": ssl_context = ssl_context_from_node_config(self._node_config) pool_kwargs.setdefault("ssl_context", ssl_context) # Fingerprint verification doesn't require CA certificates being loaded. # We also want to disable other verification methods as we only care # about the fingerprint of the certificates, not whether they form # a verified chain to a trust anchor. if self._node_config.ssl_assert_fingerprint: # Manually disable these in the right order on the SSLContext # so urllib3 won't think we want conflicting things. ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE pool_kwargs["assert_fingerprint"] = ( self._node_config.ssl_assert_fingerprint ) pool_kwargs["cert_reqs"] = "CERT_NONE" pool_kwargs["assert_hostname"] = False super().init_poolmanager(connections, maxsize, block=block, **pool_kwargs) # type: ignore [no-untyped-call] self.poolmanager.pool_classes_by_scheme["https"] = HTTPSConnectionPool except ImportError: # pragma: nocover _REQUESTS_AVAILABLE = False _REQUESTS_META_VERSION = "" class RequestsHttpNode(BaseNode): """Synchronous node using the ``requests`` library communicating via HTTP. Supports setting :attr:`requests.Session.auth` via the :attr:`elastic_transport.NodeConfig._extras` using the ``requests.session.auth`` key. """ _CLIENT_META_HTTP_CLIENT = ("rq", _REQUESTS_META_VERSION) def __init__(self, config: NodeConfig): if not _REQUESTS_AVAILABLE: # pragma: nocover raise ValueError( "You must have 'requests' installed to use RequestsHttpNode" ) super().__init__(config) # Initialize Session so .headers works before calling super().__init__(). self.session = requests.Session() self.session.headers.clear() # Empty out all the default session headers if config.scheme == "https": # If we're using ssl_assert_fingerprint we don't want # to verify certificates the typical way. Instead we # rely on the custom ElasticHTTPAdapter and urllib3. if config.ssl_assert_fingerprint: self.session.verify = False # Otherwise we go the traditional route of verifying certs. else: if config.ca_certs: if not config.verify_certs: raise ValueError( "You cannot use 'ca_certs' when 'verify_certs=False'" ) self.session.verify = config.ca_certs else: self.session.verify = config.verify_certs if not config.ssl_show_warn: urllib3.disable_warnings() if ( config.scheme == "https" and not config.verify_certs and config.ssl_show_warn ): warnings.warn( f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure", stacklevel=warn_stacklevel(), category=SecurityWarning, ) # Requests supports setting 'session.auth' via _extras['requests.session.auth'] = ... try: requests_session_auth: Optional[AuthBase] = config._extras.pop( "requests.session.auth", None ) except AttributeError: requests_session_auth = None if requests_session_auth is not None: self.session.auth = requests_session_auth # Client certificates if config.client_cert: if config.client_key: self.session.cert = (config.client_cert, config.client_key) else: self.session.cert = config.client_cert # Create and mount custom adapter for constraining number of connections adapter = _ElasticHTTPAdapter( node_config=config, pool_connections=config.connections_per_node, pool_maxsize=config.connections_per_node, pool_block=True, ) # Preload the HTTPConnectionPool so initialization issues # are raised here instead of in perform_request() if hasattr(adapter, "get_connection_with_tls_context"): request = requests.Request(method="GET", url=self.base_url) prepared_request = self.session.prepare_request(request) adapter.get_connection_with_tls_context( prepared_request, verify=self.session.verify ) else: # elastic-transport is not vulnerable to CVE-2024-35195 because it uses # requests.Session and an SSLContext without using the verify parameter. # We should remove this branch when requiring requests 2.32 or later. adapter.get_connection(self.base_url) self.session.mount(prefix=f"{self.scheme}://", adapter=adapter) def perform_request( self, method: str, target: str, body: Optional[bytes] = None, headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: url = self.base_url + target headers = HttpHeaders(headers or ()) request_headers = self._headers.copy() if headers: request_headers.update(headers) body_to_send: Optional[bytes] if body: if self._http_compress: body_to_send = gzip.compress(body) request_headers["content-encoding"] = "gzip" else: body_to_send = body else: body_to_send = None start = time.time() request = requests.Request( method=method, headers=request_headers, url=url, data=body_to_send ) prepared_request = self.session.prepare_request(request) send_kwargs = { "timeout": ( request_timeout if request_timeout is not DEFAULT else self.config.request_timeout ) } send_kwargs.update( self.session.merge_environment_settings( # type: ignore[arg-type] prepared_request.url, {}, None, None, None ) ) try: response = self.session.send(prepared_request, **send_kwargs) # type: ignore[arg-type] data = response.content duration = time.time() - start response_headers = HttpHeaders(response.headers) except RERAISE_EXCEPTIONS: raise except Exception as e: err: Exception if isinstance(e, requests.Timeout): err = ConnectionTimeout( "Connection timed out during request", errors=(e,) ) elif isinstance(e, (ssl.SSLError, requests.exceptions.SSLError)): err = TlsError(str(e), errors=(e,)) elif isinstance(e, BUILTIN_EXCEPTIONS): raise else: err = ConnectionError(str(e), errors=(e,)) self._log_request( method=method, target=target, headers=request_headers, body=body, exception=err, ) raise err from None meta = ApiResponseMeta( node=self.config, duration=duration, http_version="1.1", status=response.status_code, headers=response_headers, ) self._log_request( method=method, target=target, headers=request_headers, body=body, meta=meta, response=data, ) return NodeApiResponse( meta, data, ) def close(self) -> None: """ Explicitly closes connections """ self.session.close()