rally-custom/esrally/client/synchronous.py (145 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import warnings from typing import Any, Iterable, Mapping, Optional from elastic_transport import ( ApiResponse, BinaryApiResponse, HeadApiResponse, ListApiResponse, ObjectApiResponse, TextApiResponse, ) from elastic_transport.client_utils import DEFAULT from elasticsearch import Elasticsearch from elasticsearch.compat import warn_stacklevel from elasticsearch.exceptions import ( HTTP_EXCEPTIONS, ApiError, ElasticsearchWarning, UnsupportedProductError, ) from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query from esrally.utils import versions # This reproduces the product verification behavior of v7.14.0 of the client: # https://github.com/elastic/elasticsearch-py/blob/v7.14.0/elasticsearch/transport.py#L606 # # As of v8.0.0, the client determines whether the server is Elasticsearch by checking # whether HTTP responses contain the `X-elastic-product` header. If they do not, it raises # an `UnsupportedProductError`. This header was only introduced in Elasticsearch 7.14.0, # however, so the client will consider any version of ES prior to 7.14.0 unsupported due to # responses not including it. # # Because Rally needs to support versions of ES >= 6.8.0, we resurrect the previous # logic for determining the authenticity of the server, which does not rely exclusively # on this header. class _ProductChecker: """Class which verifies we're connected to a supported product""" # States that can be returned from 'check_product' SUCCESS = True UNSUPPORTED_PRODUCT = 2 UNSUPPORTED_DISTRIBUTION = 3 @classmethod def raise_error(cls, state, meta, body): # These states mean the product_check() didn't fail so do nothing. if state in (None, True): return if state == cls.UNSUPPORTED_DISTRIBUTION: message = "The client noticed that the server is not a supported distribution of Elasticsearch" else: # UNSUPPORTED_PRODUCT message = "The client noticed that the server is not Elasticsearch and we do not support this unknown product" raise UnsupportedProductError(message, meta=meta, body=body) @classmethod def check_product(cls, headers, response): # type: (dict[str, str], dict[str, str]) -> int """This class was supposed to verify that the server we are talking to is Elasticsearch. It just returns True """ return True class RallySyncElasticsearch(Elasticsearch): def __init__(self, *args, **kwargs): distribution_version = kwargs.pop("distribution_version", None) distribution_flavor = kwargs.pop("distribution_flavor", None) super().__init__(*args, **kwargs) self._verified_elasticsearch = None self.distribution_version = distribution_version self.distribution_flavor = distribution_flavor @property def is_serverless(self): return versions.is_serverless(self.distribution_flavor) def options(self, *args, **kwargs): new_self = super().options(*args, **kwargs) new_self.distribution_version = self.distribution_version new_self.distribution_flavor = self.distribution_flavor return new_self def perform_request( self, method: str, path: str, *, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, str]] = None, body: Optional[Any] = None, ) -> ApiResponse[Any]: # We need to ensure that we provide content-type and accept headers if body is not None: if headers is None: headers = {"content-type": "application/json", "accept": "application/json"} else: if headers.get("content-type") is None: headers["content-type"] = "application/json" if headers.get("accept") is None: headers["accept"] = "application/json" if headers: request_headers = self._headers.copy() request_headers.update(headers) else: request_headers = self._headers if self._verified_elasticsearch is None: info = self.transport.perform_request(method="GET", target="/", headers=request_headers) info_meta = info.meta info_body = info.body if not 200 <= info_meta.status < 299: raise HTTP_EXCEPTIONS.get(info_meta.status, ApiError)(message=str(info_body), meta=info_meta, body=info_body) self._verified_elasticsearch = _ProductChecker.check_product(info_meta.headers, info_body) if self._verified_elasticsearch is not True: _ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body) # Converts all parts of a Accept/Content-Type headers # from application/X -> application/vnd.elasticsearch+X # see https://github.com/elastic/elasticsearch/issues/51816 if not self.is_serverless: if versions.is_version_identifier(self.distribution_version) and ( versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0") ): _mimetype_header_to_compat("Accept", headers) _mimetype_header_to_compat("Content-Type", headers) if params: target = f"{path}?{_quote_query(params)}" else: target = path meta, resp_body = self.transport.perform_request( method, target, headers=request_headers, body=body, request_timeout=self._request_timeout, max_retries=self._max_retries, retry_on_status=self._retry_on_status, retry_on_timeout=self._retry_on_timeout, client_meta=self._client_meta, ) # HEAD with a 404 is returned as a normal response # since this is used as an 'exists' functionality. if not (method == "HEAD" and meta.status == 404) and ( not 200 <= meta.status < 299 and (self._ignore_status is DEFAULT or self._ignore_status is None or meta.status not in self._ignore_status) ): message = str(resp_body) # If the response is an error response try parsing # the raw Elasticsearch error before raising. if isinstance(resp_body, dict): try: error = resp_body.get("error", message) if isinstance(error, dict) and "type" in error: error = error["type"] message = error except (ValueError, KeyError, TypeError): pass raise HTTP_EXCEPTIONS.get(meta.status, ApiError)(message=message, meta=meta, body=resp_body) # 'Warning' headers should be reraised as 'ElasticsearchWarning' if "warning" in meta.headers: warning_header = (meta.headers.get("warning") or "").strip() warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (warning_header,) stacklevel = warn_stacklevel() for warning_message in warning_messages: warnings.warn( warning_message, category=ElasticsearchWarning, stacklevel=stacklevel, ) if method == "HEAD": response = HeadApiResponse(meta=meta) elif isinstance(resp_body, dict): response = ObjectApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] elif isinstance(resp_body, list): response = ListApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] elif isinstance(resp_body, str): response = TextApiResponse( # type: ignore[assignment] body=resp_body, meta=meta, ) elif isinstance(resp_body, bytes): response = BinaryApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] else: response = ApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] return response