elastic_transport/_node/_http_urllib3.py (173 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import gzip import ssl import time import warnings from typing import Any, Dict, Optional, Union try: from importlib import metadata except ImportError: import importlib_metadata as metadata # type: ignore[no-redef] import urllib3 from urllib3.exceptions import ConnectTimeoutError, NewConnectionError, ReadTimeoutError from urllib3.util.retry import Retry from .._compat import warn_stacklevel from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError from .._models import ApiResponseMeta, HttpHeaders, NodeConfig from ..client_utils import DEFAULT, DefaultType, client_meta_version from ._base import ( BUILTIN_EXCEPTIONS, DEFAULT_CA_CERTS, RERAISE_EXCEPTIONS, BaseNode, NodeApiResponse, ssl_context_from_node_config, ) try: from ._urllib3_chain_certs import HTTPSConnectionPool except (ImportError, AttributeError): HTTPSConnectionPool = urllib3.HTTPSConnectionPool # type: ignore[assignment,misc] class Urllib3HttpNode(BaseNode): """Default synchronous node class using the ``urllib3`` library via HTTP""" _CLIENT_META_HTTP_CLIENT = ("ur", client_meta_version(metadata.version("urllib3"))) def __init__(self, config: NodeConfig): super().__init__(config) pool_class = urllib3.HTTPConnectionPool kw: Dict[str, Any] = {} if config.scheme == "https": pool_class = HTTPSConnectionPool ssl_context = ssl_context_from_node_config(config) kw["ssl_context"] = ssl_context if config.ssl_assert_hostname and config.ssl_assert_fingerprint: raise ValueError( "Can't specify both 'ssl_assert_hostname' and 'ssl_assert_fingerprint'" ) # Fingerprint verification doesn't require CA certificates being loaded. # We also want to disable other verification methods as we only care # about the fingerprint of the certificates, not whether they form # a verified chain to a trust anchor. elif config.ssl_assert_fingerprint: # Manually disable these in the right order on the SSLContext # so urllib3 won't think we want conflicting things. ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE kw.update( { "assert_fingerprint": config.ssl_assert_fingerprint, "assert_hostname": False, "cert_reqs": "CERT_NONE", } ) else: kw["assert_hostname"] = config.ssl_assert_hostname # Convert all sentinel values to their actual default # values if not using an SSLContext. ca_certs = ( DEFAULT_CA_CERTS if config.ca_certs is None else config.ca_certs ) if config.verify_certs: if not ca_certs: raise ValueError( "Root certificates are missing for certificate " "validation. Either pass them in using the ca_certs parameter or " "install certifi to use it automatically." ) kw.update( { "cert_reqs": "CERT_REQUIRED", "ca_certs": ca_certs, "cert_file": config.client_cert, "key_file": config.client_key, } ) else: kw["cert_reqs"] = "CERT_NONE" if config.ssl_show_warn: warnings.warn( f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure", stacklevel=warn_stacklevel(), category=SecurityWarning, ) else: urllib3.disable_warnings() self.pool = pool_class( config.host, port=config.port, timeout=urllib3.Timeout(total=config.request_timeout), maxsize=config.connections_per_node, block=True, **kw, ) def perform_request( self, method: str, target: str, body: Optional[bytes] = None, headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: if self.path_prefix: target = f"{self.path_prefix}{target}" start = time.time() try: kw = {} if request_timeout is not DEFAULT: kw["timeout"] = request_timeout request_headers = self._headers.copy() if headers: request_headers.update(headers) body_to_send: Optional[bytes] if body: if self._http_compress: body_to_send = gzip.compress(body) request_headers["content-encoding"] = "gzip" else: body_to_send = body else: body_to_send = None response = self.pool.urlopen( method, target, body=body_to_send, retries=Retry(False), headers=request_headers, **kw, # type: ignore[arg-type] ) response_headers = HttpHeaders(response.headers) data = response.data duration = time.time() - start except RERAISE_EXCEPTIONS: raise except Exception as e: err: Exception if isinstance(e, NewConnectionError): err = ConnectionError(str(e), errors=(e,)) elif isinstance(e, (ConnectTimeoutError, ReadTimeoutError)): err = ConnectionTimeout( "Connection timed out during request", errors=(e,) ) elif isinstance(e, (ssl.SSLError, urllib3.exceptions.SSLError)): err = TlsError(str(e), errors=(e,)) elif isinstance(e, BUILTIN_EXCEPTIONS): raise else: err = ConnectionError(str(e), errors=(e,)) self._log_request( method=method, target=target, headers=request_headers, body=body, exception=err, ) raise err from e meta = ApiResponseMeta( node=self.config, duration=duration, http_version="1.1", status=response.status, headers=response_headers, ) self._log_request( method=method, target=target, headers=request_headers, body=body, meta=meta, response=data, ) return NodeApiResponse( meta, data, ) def close(self) -> None: """ Explicitly closes connection """ self.pool.close()