elastic_transport/_node/_http_httpx.py (161 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import gzip import os.path import ssl import time import warnings from typing import Literal, Optional, Union from .._compat import warn_stacklevel from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError from .._models import ApiResponseMeta, HttpHeaders, NodeConfig from ..client_utils import DEFAULT, DefaultType, client_meta_version from ._base import ( BUILTIN_EXCEPTIONS, DEFAULT_CA_CERTS, RERAISE_EXCEPTIONS, NodeApiResponse, ssl_context_from_node_config, ) from ._base_async import BaseAsyncNode try: import httpx _HTTPX_AVAILABLE = True _HTTPX_META_VERSION = client_meta_version(httpx.__version__) except ImportError: _HTTPX_AVAILABLE = False _HTTPX_META_VERSION = "" class HttpxAsyncHttpNode(BaseAsyncNode): _CLIENT_META_HTTP_CLIENT = ("hx", _HTTPX_META_VERSION) def __init__(self, config: NodeConfig): if not _HTTPX_AVAILABLE: # pragma: nocover raise ValueError("You must have 'httpx' installed to use HttpxNode") super().__init__(config) if config.ssl_assert_fingerprint: raise ValueError( "httpx does not support certificate pinning. https://github.com/encode/httpx/issues/761" ) ssl_context: Union[ssl.SSLContext, Literal[False]] = False if config.scheme == "https": if config.ssl_context is not None: ssl_context = ssl_context_from_node_config(config) else: ssl_context = ssl_context_from_node_config(config) ca_certs = ( DEFAULT_CA_CERTS if config.ca_certs is None else config.ca_certs ) if config.verify_certs: if not ca_certs: raise ValueError( "Root certificates are missing for certificate " "validation. Either pass them in using the ca_certs parameter or " "install certifi to use it automatically." ) else: if config.ssl_show_warn: warnings.warn( f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure", stacklevel=warn_stacklevel(), category=SecurityWarning, ) if ca_certs is not None: if os.path.isfile(ca_certs): ssl_context.load_verify_locations(cafile=ca_certs) elif os.path.isdir(ca_certs): ssl_context.load_verify_locations(capath=ca_certs) else: raise ValueError("ca_certs parameter is not a path") # Use client_cert and client_key variables for SSL certificate configuration. if config.client_cert and not os.path.isfile(config.client_cert): raise ValueError("client_cert is not a path to a file") if config.client_key and not os.path.isfile(config.client_key): raise ValueError("client_key is not a path to a file") if config.client_cert and config.client_key: ssl_context.load_cert_chain(config.client_cert, config.client_key) elif config.client_cert: ssl_context.load_cert_chain(config.client_cert) self.client = httpx.AsyncClient( base_url=f"{config.scheme}://{config.host}:{config.port}", limits=httpx.Limits(max_connections=config.connections_per_node), verify=ssl_context or False, timeout=config.request_timeout, ) async def perform_request( # type: ignore[override] self, method: str, target: str, body: Optional[bytes] = None, headers: Optional[HttpHeaders] = None, request_timeout: Union[DefaultType, Optional[float]] = DEFAULT, ) -> NodeApiResponse: resolved_headers = self._headers.copy() if headers: resolved_headers.update(headers) if body: if self._http_compress: resolved_body = gzip.compress(body) resolved_headers["content-encoding"] = "gzip" else: resolved_body = body else: resolved_body = None try: start = time.perf_counter() if request_timeout is DEFAULT: resp = await self.client.request( method, target, content=resolved_body, headers=dict(resolved_headers), ) else: resp = await self.client.request( method, target, content=resolved_body, headers=dict(resolved_headers), timeout=request_timeout, ) response_body = resp.read() duration = time.perf_counter() - start except RERAISE_EXCEPTIONS + BUILTIN_EXCEPTIONS: raise except Exception as e: err: Exception if isinstance(e, (TimeoutError, httpx.TimeoutException)): err = ConnectionTimeout( "Connection timed out during request", errors=(e,) ) elif isinstance(e, ssl.SSLError): err = TlsError(str(e), errors=(e,)) # Detect SSL errors for httpx v0.28.0+ # Needed until https://github.com/encode/httpx/issues/3350 is fixed elif isinstance(e, httpx.ConnectError) and e.__cause__: context = e.__cause__.__context__ if isinstance(context, ssl.SSLError): err = TlsError(str(context), errors=(e,)) else: err = ConnectionError(str(e), errors=(e,)) else: err = ConnectionError(str(e), errors=(e,)) self._log_request( method=method, target=target, headers=resolved_headers, body=body, exception=err, ) raise err from None meta = ApiResponseMeta( resp.status_code, resp.http_version, HttpHeaders(resp.headers), duration, self.config, ) self._log_request( method=method, target=target, headers=resolved_headers, body=body, meta=meta, response=response_body, ) return NodeApiResponse(meta, response_body) async def close(self) -> None: # type: ignore[override] await self.client.aclose()