esrally/driver/runner.py (2,086 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import asyncio import contextvars import json import logging import random import re import sys import time from collections import Counter, OrderedDict from copy import deepcopy from enum import Enum from functools import total_ordering from io import BytesIO from os.path import commonprefix from types import FunctionType from typing import Optional import ijson from esrally import exceptions, track, types from esrally.utils import convert from esrally.utils.versions import Version # Mapping from operation type to specific runner __RUNNERS = {} def register_default_runners(config: Optional[types.Config] = None): register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True) register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True) register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True) register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True) register_runner(track.OperationType.Search, Query(config=config), async_runner=True) register_runner(track.OperationType.PaginatedSearch, Query(config=config), async_runner=True) register_runner(track.OperationType.CompositeAgg, Query(config=config), async_runner=True) register_runner(track.OperationType.ScrollSearch, Query(config=config), async_runner=True) register_runner(track.OperationType.RawRequest, RawRequest(), async_runner=True) register_runner(track.OperationType.Composite, Composite(), async_runner=True) register_runner(track.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True) register_runner(track.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True) register_runner(track.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True) register_runner(track.OperationType.OpenPointInTime, OpenPointInTime(), async_runner=True) register_runner(track.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True) register_runner(track.OperationType.Sql, Sql(), async_runner=True) register_runner(track.OperationType.FieldCaps, FieldCaps(), async_runner=True) register_runner(track.OperationType.Esql, Esql(), async_runner=True) # This is an administrative operation but there is no need for a retry here as we don't issue a request register_runner(track.OperationType.Sleep, Sleep(), async_runner=True) # these requests should not be retried as they are not idempotent register_runner(track.OperationType.CreateSnapshot, CreateSnapshot(), async_runner=True) register_runner(track.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True) register_runner(track.OperationType.Downsample, Downsample(), async_runner=True) # We treat the following as administrative commands and thus already start to wrap them in a retry. register_runner(track.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True) register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True) register_runner(track.OperationType.Refresh, Retry(Refresh()), async_runner=True) register_runner(track.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True) register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex(config=config)), async_runner=True) register_runner(track.OperationType.CreateComponentTemplate, Retry(CreateComponentTemplate()), async_runner=True) register_runner(track.OperationType.DeleteComponentTemplate, Retry(DeleteComponentTemplate()), async_runner=True) register_runner(track.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), async_runner=True) register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate(config=config)), async_runner=True) register_runner(track.OperationType.CreateDataStream, Retry(CreateDataStream()), async_runner=True) register_runner(track.OperationType.DeleteDataStream, Retry(DeleteDataStream()), async_runner=True) register_runner(track.OperationType.CreateIndexTemplate, Retry(CreateIndexTemplate()), async_runner=True) register_runner(track.OperationType.DeleteIndexTemplate, Retry(DeleteIndexTemplate()), async_runner=True) register_runner(track.OperationType.ShrinkIndex, Retry(ShrinkIndex()), async_runner=True) register_runner(track.OperationType.CreateMlDatafeed, Retry(CreateMlDatafeed()), async_runner=True) register_runner(track.OperationType.DeleteMlDatafeed, Retry(DeleteMlDatafeed()), async_runner=True) register_runner(track.OperationType.StartMlDatafeed, Retry(StartMlDatafeed()), async_runner=True) register_runner(track.OperationType.StopMlDatafeed, Retry(StopMlDatafeed()), async_runner=True) register_runner(track.OperationType.CreateMlJob, Retry(CreateMlJob()), async_runner=True) register_runner(track.OperationType.DeleteMlJob, Retry(DeleteMlJob()), async_runner=True) register_runner(track.OperationType.OpenMlJob, Retry(OpenMlJob()), async_runner=True) register_runner(track.OperationType.CloseMlJob, Retry(CloseMlJob()), async_runner=True) register_runner(track.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), async_runner=True) register_runner(track.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), async_runner=True) register_runner(track.OperationType.WaitForSnapshotCreate, Retry(WaitForSnapshotCreate()), async_runner=True) register_runner(track.OperationType.WaitForCurrentSnapshotsCreate, Retry(WaitForCurrentSnapshotsCreate()), async_runner=True) register_runner(track.OperationType.WaitForRecovery, Retry(IndicesRecovery()), async_runner=True) register_runner(track.OperationType.PutSettings, Retry(PutSettings()), async_runner=True) register_runner(track.OperationType.CreateTransform, Retry(CreateTransform()), async_runner=True) register_runner(track.OperationType.StartTransform, Retry(StartTransform()), async_runner=True) register_runner(track.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True) register_runner(track.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True) register_runner(track.OperationType.TransformStats, Retry(TransformStats()), async_runner=True) register_runner(track.OperationType.CreateIlmPolicy, Retry(CreateIlmPolicy()), async_runner=True) register_runner(track.OperationType.DeleteIlmPolicy, Retry(DeleteIlmPolicy()), async_runner=True) def runner_for(operation_type): try: return __RUNNERS[operation_type] except KeyError: raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]") def enable_assertions(enabled): """ Changes whether assertions are enabled. The status changes for all tasks that are executed after this call. :param enabled: ``True`` to enable assertions, ``False`` to disable them. """ AssertingRunner.assertions_enabled = enabled def register_runner(operation_type, runner, **kwargs): logger = logging.getLogger(__name__) async_runner = kwargs.get("async_runner", False) if isinstance(operation_type, track.OperationType): operation_type = operation_type.to_hyphenated_string() if not async_runner: raise exceptions.RallyAssertionError( f"Runner [{str(runner)}] must be implemented as async runner and registered with async_runner=True." ) if hasattr(unwrap(runner), "multi_cluster"): if "__aenter__" in dir(runner) and "__aexit__" in dir(runner): if logger.isEnabledFor(logging.DEBUG): logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type)) cluster_aware_runner = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True) else: if logger.isEnabledFor(logging.DEBUG): logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type)) cluster_aware_runner = _multi_cluster_runner(runner, str(runner)) # we'd rather use callable() but this will erroneously also classify a class as callable... elif isinstance(runner, FunctionType): if logger.isEnabledFor(logging.DEBUG): logger.debug("Registering runner function [%s] for [%s].", str(runner), str(operation_type)) cluster_aware_runner = _single_cluster_runner(runner, runner.__name__) elif "__aenter__" in dir(runner) and "__aexit__" in dir(runner): if logger.isEnabledFor(logging.DEBUG): logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type)) cluster_aware_runner = _single_cluster_runner(runner, str(runner), context_manager_enabled=True) else: if logger.isEnabledFor(logging.DEBUG): logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type)) cluster_aware_runner = _single_cluster_runner(runner, str(runner)) __RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner)) # Only intended for unit-testing! def remove_runner(operation_type): del __RUNNERS[operation_type] class Runner: """ Base class for all operations against Elasticsearch. """ def __init__(self, *args, config=None, **kwargs): super().__init__(*args, **kwargs) self.logger = logging.getLogger(__name__) self.serverless_mode = False self.serverless_operator = False if config: self.serverless_mode = convert.to_bool(config.opts("driver", "serverless.mode", mandatory=False, default_value=False)) self.serverless_operator = convert.to_bool(config.opts("driver", "serverless.operator", mandatory=False, default_value=False)) async def __aenter__(self): return self async def __call__(self, es, params): """ Runs the actual method that should be benchmarked. :param args: All arguments that are needed to call this method. :return: A pair of (int, String). The first component indicates the "weight" of this call. it is typically 1 but for bulk operations it should be the actual bulk size. The second component is the "unit" of weight which should be "ops" (short for "operations") by default. If applicable, the unit should always be in plural form. It is used in metrics records for throughput and reports. A value will then be shown as e.g. "111 ops/s". """ raise NotImplementedError("abstract operation") async def __aexit__(self, exc_type, exc_val, exc_tb): return False def _default_kw_params(self, params): # map of API kwargs to Rally config parameters kw_dict = { "body": "body", "headers": "headers", "index": "index", "opaque_id": "opaque-id", "params": "request-params", "request_timeout": "request-timeout", } full_result = {k: params.get(v) for (k, v) in kw_dict.items()} # filter Nones return dict(filter(lambda kv: kv[1] is not None, full_result.items())) @staticmethod def _transport_request_params(params): """ Takes all of a runner's params and splits out request parameters, transport level parameters, and headers into their own respective dicts. :param params: A hash with all the respective runner's parameters. :return: A tuple of the specific runner's params, request level parameters, transport level parameters, and headers, respectively. """ transport_params = {} request_params = params.get("request-params", {}) if request_timeout := params.pop("request-timeout", None): transport_params["request_timeout"] = request_timeout if (ignore_status := request_params.pop("ignore", None)) or (ignore_status := params.pop("ignore", None)): transport_params["ignore_status"] = ignore_status headers = params.pop("headers", None) or {} if opaque_id := params.pop("opaque-id", None): headers.update({"x-opaque-id": opaque_id}) return params, request_params, transport_params, headers class Delegator: """ Mixin to unify delegate handling """ def __init__(self, delegate, *args, **kwargs): super().__init__(*args, **kwargs) self.delegate = delegate def unwrap(runner): """ Unwraps all delegators until the actual runner. :param runner: An arbitrarily nested chain of delegators around a runner. :return: The innermost runner. """ delegate = getattr(runner, "delegate", None) if delegate: return unwrap(delegate) else: return runner def _single_cluster_runner(runnable, name, context_manager_enabled=False): # only pass the default ES client return MultiClientRunner(runnable, name, lambda es: es["default"], context_manager_enabled) def _multi_cluster_runner(runnable, name, context_manager_enabled=False): # pass all ES clients return MultiClientRunner(runnable, name, lambda es: es, context_manager_enabled) def _with_assertions(delegate): return AssertingRunner(delegate) def _with_completion(delegate): unwrapped_runner = unwrap(delegate) if hasattr(unwrapped_runner, "completed") and hasattr(unwrapped_runner, "percent_completed"): return WithCompletion(delegate, unwrapped_runner) else: return NoCompletion(delegate) class NoCompletion(Runner, Delegator): def __init__(self, delegate): super().__init__(delegate=delegate) @property def completed(self): return None @property def percent_completed(self): return None async def __call__(self, *args): return await self.delegate(*args) def __repr__(self, *args, **kwargs): return repr(self.delegate) async def __aenter__(self): await self.delegate.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) class WithCompletion(Runner, Delegator): def __init__(self, delegate, progressable): super().__init__(delegate=delegate) self.progressable = progressable @property def completed(self): return self.progressable.completed @property def percent_completed(self): return self.progressable.percent_completed async def __call__(self, *args): return await self.delegate(*args) def __repr__(self, *args, **kwargs): return repr(self.delegate) async def __aenter__(self): await self.delegate.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) class MultiClientRunner(Runner, Delegator): def __init__(self, runnable, name, client_extractor, context_manager_enabled=False): super().__init__(delegate=runnable) self.name = name self.client_extractor = client_extractor self.context_manager_enabled = context_manager_enabled async def __call__(self, *args): return await self.delegate(self.client_extractor(args[0]), *args[1:]) def __repr__(self, *args, **kwargs): if self.context_manager_enabled: return "user-defined context-manager enabled runner for [%s]" % self.name else: return "user-defined runner for [%s]" % self.name async def __aenter__(self): if self.context_manager_enabled: await self.delegate.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.context_manager_enabled: return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) else: return False class AssertingRunner(Runner, Delegator): assertions_enabled = False def __init__(self, delegate): super().__init__(delegate=delegate) self.predicates = { ">": self.greater_than, ">=": self.greater_than_or_equal, "<": self.smaller_than, "<=": self.smaller_than_or_equal, "==": self.equal, } def greater_than(self, expected, actual): return actual > expected def greater_than_or_equal(self, expected, actual): return actual >= expected def smaller_than(self, expected, actual): return actual < expected def smaller_than_or_equal(self, expected, actual): return actual <= expected def equal(self, expected, actual): return actual == expected def check_assertion(self, op_name, assertion, properties): path = assertion["property"] predicate_name = assertion["condition"] expected_value = assertion["value"] actual_value = properties for k in path.split("."): actual_value = actual_value[k] predicate = self.predicates[predicate_name] success = predicate(expected_value, actual_value) if not success: if op_name: msg = f"Expected [{path}] in [{op_name}] to be {predicate_name} [{expected_value}] but was [{actual_value}]." else: msg = f"Expected [{path}] to be {predicate_name} [{expected_value}] but was [{actual_value}]." raise exceptions.RallyTaskAssertionError(msg) async def __call__(self, *args): params = args[1] return_value = await self.delegate(*args) if AssertingRunner.assertions_enabled and "assertions" in params: op_name = params.get("name") if isinstance(return_value, dict): for assertion in params["assertions"]: self.check_assertion(op_name, assertion, return_value) else: raise exceptions.DataError(f"Cannot check assertion in [{op_name}] as [{repr(self.delegate)}] did not return a dict.") return return_value def __repr__(self, *args, **kwargs): return repr(self.delegate) async def __aenter__(self): await self.delegate.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) def mandatory(params, key, op): try: return params[key] except KeyError: raise exceptions.DataError( f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. " f"Add it to your parameter source and try again." ) def escape(v): """ Escapes values so they can be used as query parameters :param v: The raw value. May be None. :return: The escaped value. """ if v is None: return None elif isinstance(v, bool): return str(v).lower() else: return str(v) class BulkIndex(Runner): """ Bulk indexes the given documents. """ async def __call__(self, es, params): """ Runs one bulk indexing operation. :param es: The Elasticsearch client. :param params: A hash with all parameters. See below for details. :return: A hash with meta data for this bulk operation. See below for details. It expects a parameter dict with the following mandatory keys: * ``body``: containing all documents for the current bulk request. * ``bulk-size``: An indication of the bulk size denoted in ``unit``. * ``unit``: The name of the unit in which the bulk size is provided. * ``action_metadata_present``: if ``True``, assume that an action and metadata line is present (meaning only half of the lines contain actual documents to index) * ``index``: The name of the affected index in case ``action_metadata_present`` is ``False``. * ``type``: The name of the affected type in case ``action_metadata_present`` is ``False``. The following keys are optional: * ``pipeline``: If present, runs the the specified ingest pipeline for this bulk. * ``detailed-results``: If ``True``, the runner will analyze the response and add detailed meta-data. Defaults to ``False``. Note that this has a very significant impact on performance and will very likely cause a bottleneck in the benchmark driver so please be very cautious enabling this feature. Our own measurements have shown a median overhead of several thousand times (execution time is in the single digit microsecond range when this feature is disabled and in the single digit millisecond range when this feature is enabled; numbers based on a bulk size of 500 elements and no errors). For details please refer to the respective benchmarks in ``benchmarks/driver``. * ``timeout``: a time unit value indicating the server-side timeout for the operation * ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to ``None`` and potentially falls back to the global timeout setting. * ``refresh``: If ``"true"``, Elasticsearch will issue an async refresh to the index; i.e., ``?refresh=true``. If ``"wait_for"``, Elasticsearch issues a synchronous refresh to the index; i.e., ``?refresh=wait_for``. If ``"false""``, Elasticsearch will use refresh defaults; i.e., ``?refresh=false``. """ detailed_results = params.get("detailed-results", False) api_kwargs = self._default_kw_params(params) bulk_params = {} if "timeout" in params: bulk_params["timeout"] = params["timeout"] if "pipeline" in params: bulk_params["pipeline"] = params["pipeline"] if "refresh" in params: valid_refresh_values = ("wait_for", "true", "false") if params["refresh"] not in valid_refresh_values: raise exceptions.RallyAssertionError( f"Unsupported bulk refresh value: {params['refresh']}. Use one of [{', '.join(valid_refresh_values)}]." ) bulk_params["refresh"] = params["refresh"] with_action_metadata = mandatory(params, "action-metadata-present", self) bulk_size = mandatory(params, "bulk-size", self) unit = mandatory(params, "unit", self) # parse responses lazily in the standard case - responses might be large thus parsing skews results and if no # errors have occurred we only need a small amount of information from the potentially large response. if not detailed_results: es.return_raw_response() if with_action_metadata: api_kwargs.pop("index", None) # only half of the lines are documents response = await es.bulk(params=bulk_params, **api_kwargs) else: response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response) meta_data = { "index": params.get("index"), "weight": bulk_size, "unit": unit, } meta_data.update(stats) if not stats["success"]: meta_data["error-type"] = "bulk" return meta_data def detailed_stats(self, params, response): def _utf8len(line): if isinstance(line, bytes): return len(line) else: return len(line.encode("utf-8")) ops = {} shards_histogram = OrderedDict() bulk_error_count = 0 bulk_success_count = 0 error_details = set() bulk_request_size_bytes = 0 total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) if isinstance(params["body"], bytes): bulk_lines = params["body"].split(b"\n") elif isinstance(params["body"], str): bulk_lines = params["body"].split("\n") elif isinstance(params["body"], list): bulk_lines = params["body"] else: raise exceptions.DataError("bulk body is not of type bytes, string, or list") for line_number, data in enumerate(bulk_lines): line_size = _utf8len(data) if with_action_metadata: if line_number % 2 == 1: total_document_size_bytes += line_size else: total_document_size_bytes += line_size bulk_request_size_bytes += line_size for item in response["items"]: # there is only one (top-level) item op, data = next(iter(item.items())) if op not in ops: ops[op] = Counter() ops[op]["item-count"] += 1 if "result" in data: ops[op][data["result"]] += 1 if "_shards" in data: s = data["_shards"] sk = "%d-%d-%d" % (s["total"], s["successful"], s["failed"]) if sk not in shards_histogram: shards_histogram[sk] = {"item-count": 0, "shards": s} shards_histogram[sk]["item-count"] += 1 if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): bulk_error_count += 1 self.extract_error_details(error_details, data) else: bulk_success_count += 1 stats = { "took": response.get("took"), "success": bulk_error_count == 0, "success-count": bulk_success_count, "error-count": bulk_error_count, "ops": ops, "shards_histogram": list(shards_histogram.values()), "bulk-request-size-bytes": bulk_request_size_bytes, "total-document-size-bytes": total_document_size_bytes, } if bulk_error_count > 0: stats["error-type"] = "bulk" stats["error-description"] = self.error_description(error_details) self.logger.warning("Bulk request failed: [%s]", stats["error-description"]) if "ingest_took" in response: stats["ingest_took"] = response["ingest_took"] return stats def simple_stats(self, bulk_size, unit, response): bulk_success_count = bulk_size if unit == "docs" else None bulk_error_count = 0 error_details = set() # parse lazily on the fast path props = parse(response, ["errors", "took"]) if props.get("errors", False): # determine success count regardless of unit because we need to iterate through all items anyway bulk_success_count = 0 # Reparse fully in case of errors - this will be slower parsed_response = json.loads(response.getvalue()) for item in parsed_response["items"]: data = next(iter(item.values())) if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): bulk_error_count += 1 self.extract_error_details(error_details, data) else: bulk_success_count += 1 stats = { "took": props.get("took"), "success": bulk_error_count == 0, "success-count": bulk_success_count, "error-count": bulk_error_count, } if bulk_error_count > 0: stats["error-type"] = "bulk" stats["error-description"] = self.error_description(error_details) return stats def extract_error_details(self, error_details, data): error_data = data.get("error", {}) error_reason = error_data.get("reason") if isinstance(error_data, dict) else str(error_data) if error_data: error_details.add((data["status"], error_reason)) else: error_details.add((data["status"], None)) def _error_status_summary(self, error_details): """ Generates error status code summary. :param error_details: accumulated error details :return: error status summary """ status_counts = {} for status, _ in error_details: status_counts[status] = status_counts.get(status, 0) + 1 status_summaries = [] for status in sorted(status_counts.keys()): status_summaries.append(f"{status_counts[status]}x{status}") return ", ".join(status_summaries) def error_description(self, error_details): """ Generates error description with an arbitrary limit of 5 errors. :param error_details: accumulated error details :return: error description """ error_descriptions = [] is_truncated = False for count, error_detail in enumerate(sorted(error_details)): status, reason = error_detail if count < 5: if reason: error_descriptions.append(f"HTTP status: {status}, message: {reason}") else: error_descriptions.append(f"HTTP status: {status}") else: is_truncated = True break description = " | ".join(error_descriptions) if is_truncated: description = description + " | TRUNCATED " + self._error_status_summary(error_details) return description def __repr__(self, *args, **kwargs): return "bulk-index" class ForceMerge(Runner): """ Runs a force merge operation against Elasticsearch. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": complete = False try: await es.indices.forcemerge(**merge_params) complete = True except elasticsearch.ConnectionTimeout: pass while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) if len(tasks["nodes"]) == 0: # empty nodes response indicates no tasks complete = True else: await es.indices.forcemerge(**merge_params) def __repr__(self, *args, **kwargs): return "force-merge" class IndicesStats(Runner): """ Gather index stats for all indices. """ def _get(self, v, path): if v is None: return None elif len(path) == 1: return v.get(path[0]) else: return self._get(v.get(path[0]), path[1:]) def _safe_string(self, v): return str(v) if v is not None else None async def __call__(self, es, params): api_kwargs = self._default_kw_params(params) index = api_kwargs.pop("index", "_all") condition = params.get("condition") response = await es.indices.stats(index=index, metric="_all", **api_kwargs) if condition: path = mandatory(condition, "path", repr(self)) expected_value = mandatory(condition, "expected-value", repr(self)) actual_value = self._get(response, path.split(".")) return { "weight": 1, "unit": "ops", "condition": { "path": path, # avoid mapping issues in the ES metrics store by always rendering values as strings "actual-value": self._safe_string(actual_value), "expected-value": self._safe_string(expected_value), }, # currently we only support "==" as a predicate but that might change in the future "success": actual_value == expected_value, } else: return { "weight": 1, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "indices-stats" class NodeStats(Runner): """ Gather node stats for all nodes. """ async def __call__(self, es, params): request_timeout = params.get("request-timeout") await es.options(request_timeout=request_timeout).nodes.stats(metric="_all") def __repr__(self, *args, **kwargs): return "node-stats" def parse(text: BytesIO, props: list[str], lists: list[str] = None, objects: list[str] = None) -> dict: """ Selectively parse the provided text as JSON extracting only the properties provided in ``props``. If ``lists`` is specified, this function determines whether the provided lists are empty (respective value will be ``True``) or contain elements (respective key will be ``False``). If ``objects`` is specified, it will in addition extract the JSON objects under the given keys. These JSON objects must be flat dicts, only containing primitive types within. :param text: A text to parse. :param props: A mandatory list of property paths (separated by a dot character) for which to extract values. :param lists: An optional list of property paths to JSON lists in the provided text. :param objects: An optional list of property paths to flat JSON objects in the provided text. :return: A dict containing all properties, lists, and flat objects that have been found in the provided text. """ text.seek(0) parser = ijson.parse(text) parsed = {} parsed_lists = {} current_object = {} current_list = None expect_end_array = False parsed_objects = {} in_object = None try: for prefix, event, value in parser: if expect_end_array: # True if the list is empty, False otherwise parsed_lists[current_list] = event == "end_array" expect_end_array = False if prefix in props: parsed[prefix] = value elif lists is not None and prefix in lists and event == "start_array": current_list = prefix expect_end_array = True elif objects is not None and event == "end_map" and prefix in objects: parsed_objects[in_object] = current_object in_object = None elif objects is not None and event == "start_map" and prefix in objects: in_object = prefix current_object = {} elif in_object and event in ["boolean", "integer", "double", "number", "string"]: current_object[prefix[len(in_object) + 1 :]] = value # found all necessary properties if ( len(parsed) == len(props) and (lists is None or len(parsed_lists) == len(lists)) and (objects is None or len(parsed_objects) == len(objects)) ): break except ijson.IncompleteJSONError: # did not find all properties pass parsed.update(parsed_lists) parsed.update(parsed_objects) return parsed class Query(Runner): """ Runs a request body search against Elasticsearch. It expects at least the following keys in the `params` hash: * `operation-type`: One of `search`, `paginated-search`, `scroll-search`, or `composite-agg` * `index`: The index or indices against which to issue the query. * `type`: See `index` * `cache`: True iff the request cache should be used. * `body`: Query body The following parameters are optional: * `detailed-results` (default: ``False``): Records more detailed meta-data about queries. As it analyzes the corresponding response in more detail, this might incur additional overhead which can skew measurement results. This flag is ineffective for scroll queries or composite aggs (detailed meta-data are always returned). * ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to ``None`` and potentially falls back to the global timeout setting. * `results-per-page`: Number of results to retrieve per page. This maps to the Search API's ``size`` parameter, and can be used for paginated and non-paginated searches. Defaults to ``10`` If the following parameters are present in addition, a paginated query will be issued: * `pages`: Number of pages to retrieve at most for this search. If a query yields fewer results than the specified number of pages we will terminate earlier. Returned meta data The following meta data are always returned: * ``weight``: operation-agnostic representation of the "weight" of an operation (used internally by Rally for throughput calculation). Always 1 for normal queries and the number of retrieved pages for scroll queries or composite aggs. * ``unit``: The unit in which to interpret ``weight``. Always "ops". * ``hits``: Total number of hits for this operation. * ``hits_relation``: whether ``hits`` is accurate (``eq``) or a lower bound of the actual hit count (``gte``). * ``timed_out``: Whether the search has timed out. For scroll queries, this flag is ``True`` if the flag was ``True`` for any of the queries issued. For paginated queries we also return: * ``pages``: Total number of pages that have been retrieved. """ def __init__(self, config=None): super().__init__(config=config) self._search_after_extractor = SearchAfterExtractor() self._composite_agg_extractor = CompositeAggExtractor() async def __call__(self, es, params): params, request_params, transport_params, headers = self._transport_request_params(params) # we don't set headers at the options level because the Query runner sets them via the client's '_perform_request' method es = es.options(**transport_params) # Mandatory to ensure it is always provided. This is especially important when this runner is used in a # composite context where there is no actual parameter source and the entire request structure must be provided # by the composite's parameter source. index = mandatory(params, "index", self) body = mandatory(params, "body", self) operation_type = params.get("operation-type") size = params.get("results-per-page") if size and operation_type != "composite-agg": body["size"] = size detailed_results = params.get("detailed-results", False) encoding_header = self._query_headers(params) if encoding_header is not None: headers.update(encoding_header) cache = params.get("cache") if cache is not None: request_params["request_cache"] = str(cache).lower() elif self.serverless_mode and not self.serverless_operator: request_params["request_cache"] = "false" if not bool(headers): # counter-intuitive but preserves prior behavior headers = None # disable eager response parsing - responses might be huge thus skewing results es.return_raw_response() async def _search_after_query(es, params): index = params.get("index", "_all") pit_op = params.get("with-point-in-time-from") results = { "unit": "pages", "success": True, "timed_out": False, "took": 0, } if pit_op: # these are disallowed as they are encoded in the pit_id for item in ["index", "routing", "preference"]: body.pop(item, None) index = None # explicitly convert to int to provoke an error otherwise total_pages = sys.maxsize if params.get("pages") == "all" else int(mandatory(params, "pages", self)) for page in range(1, total_pages + 1): if pit_op: pit_id = CompositeContext.get(pit_op) body["pit"] = {"id": pit_id, "keep_alive": "1m"} response = await self._raw_search(es, doc_type=None, index=index, body=body.copy(), params=request_params, headers=headers) parsed, last_sort = self._search_after_extractor( response, bool(pit_op), results.get("hits"), # type: ignore[arg-type] # TODO remove the below ignore when introducing type hints ) results["pages"] = page results["weight"] = page if results.get("hits") is None: results["hits"] = parsed.get("hits.total.value") results["hits_relation"] = parsed.get("hits.total.relation") results["took"] += parsed.get("took") # when this evaluates to True, keep it for the final result if not results["timed_out"]: results["timed_out"] = parsed.get("timed_out") if pit_op: # per the documentation the response pit id is most up-to-date CompositeContext.put(pit_op, parsed.get("pit_id")) if results.get("hits") / size > page: body["search_after"] = last_sort else: # body needs to be un-mutated for the next iteration (preferring to do this over a deepcopy at the start) for item in ["pit", "search_after"]: body.pop(item, None) break return results async def _composite_agg(es, params): index = params.get("index", "_all") pit_op = params.get("with-point-in-time-from") results = { "unit": "pages", "success": True, "timed_out": False, "took": 0, } if pit_op: # these are disallowed as they are encoded in the pit_id for item in ["index", "routing", "preference"]: body.pop(item, None) index = None # explicitly convert to int to provoke an error otherwise total_pages = sys.maxsize if params.get("pages", "all") == "all" else int(mandatory(params, "pages", self)) for page in range(1, total_pages + 1): if pit_op: pit_id = CompositeContext.get(pit_op) body["pit"] = {"id": pit_id, "keep_alive": "1m"} paths_to_composite = paths_to_composite_agg(body, []) if not paths_to_composite or len(paths_to_composite) != 1: raise exceptions.DataError("Unique path to composite agg required") path_to_composite = paths_to_composite[0] composite_agg_body = resolve_composite_agg(body, path_to_composite) if not composite_agg_body: raise exceptions.DataError("Could not find composite agg - parser inconsistency") if size: composite_agg_body["size"] = size body_to_send = tree_copy_composite_agg(body, path_to_composite) response = await self._raw_search(es, doc_type=None, index=index, body=body_to_send, params=request_params, headers=headers) parsed = self._composite_agg_extractor( response, bool(pit_op), path_to_composite, results.get("hits"), # type: ignore[arg-type] # TODO remove this ignore when introducing type hints ) results["pages"] = page results["weight"] = page if results.get("hits") is None: results["hits"] = parsed.get("hits.total.value") results["hits_relation"] = parsed.get("hits.total.relation") results["took"] += parsed.get("took") # when this evaluates to True, keep it for the final result if not results["timed_out"]: results["timed_out"] = parsed.get("timed_out") if pit_op: # per the documentation the response pit id is most up-to-date CompositeContext.put(pit_op, parsed.get("pit_id")) after_key = parsed["after_key"] if isinstance(after_key, dict): composite_agg_body["after"] = after_key else: # body needs to be un-mutated for the next iteration (preferring to do this over a deepcopy at the start) body.pop("pit", None) composite_agg_body.pop("after", None) break return results def select_aggs(obj): if isinstance(obj, dict): return obj.get("aggs") or obj.get("aggregations") return None def paths_to_composite_agg(obj, parent_key_path): aggs = select_aggs(obj) paths = [] if isinstance(aggs, dict): for key, subobj in aggs.items(): if isinstance(subobj, dict) and isinstance(subobj.get("composite"), dict): paths = paths + [parent_key_path + [key]] paths = paths + paths_to_composite_agg(subobj, parent_key_path + [key]) return paths def resolve_composite_agg(obj, key_path): if len(key_path) == 0: return obj.get("composite") else: aggs = select_aggs(obj) return resolve_composite_agg(aggs[key_path[0]], key_path[1:]) def tree_copy_composite_agg(obj, key_path): obj = obj.copy() if len(key_path) == 0: obj["composite"] = obj["composite"].copy() else: aggs = None if "aggs" in obj: aggs = obj["aggs"] = obj["aggs"].copy() elif "aggregations" in obj: aggs = obj["aggregations"] = obj["aggregations"].copy() aggs[key_path[0]] = tree_copy_composite_agg(aggs[key_path[0]], key_path[1:]) return obj async def _request_body_query(es, params): doc_type = params.get("type") r = await self._raw_search(es, doc_type, index, body, request_params, headers=headers) if detailed_results: props = parse( r, [ "hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took", "_shards.total", "_shards.successful", "_shards.skipped", "_shards.failed", ], ) hits_total = props.get("hits.total.value", props.get("hits.total", 0)) hits_relation = props.get("hits.total.relation", "eq") timed_out = props.get("timed_out", False) took = props.get("took", 0) shards_total = props.get("_shards.total", 0) shards_successful = props.get("_shards.successful", 0) shards_skipped = props.get("_shards.skipped", 0) shards_failed = props.get("_shards.failed", 0) return { "weight": 1, "unit": "ops", "success": True, "hits": hits_total, "hits_relation": hits_relation, "timed_out": timed_out, "took": took, "shards": { "total": shards_total, "successful": shards_successful, "skipped": shards_skipped, "failed": shards_failed, }, } else: return { "weight": 1, "unit": "ops", "success": True, } async def _scroll_query(es, params): hits = 0 hits_relation = None timed_out = False took = 0 retrieved_pages = 0 scroll_id = None # explicitly convert to int to provoke an error otherwise total_pages = sys.maxsize if params.get("pages") == "all" else int(mandatory(params, "pages", self)) try: for page in range(total_pages): if page == 0: sort = "_doc" scroll = "10s" doc_type = params.get("type") params = request_params.copy() params["sort"] = sort params["scroll"] = scroll params["size"] = size r = await self._raw_search(es, doc_type, index, body, params, headers=headers) props = parse( r, ["_scroll_id", "hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"], ["hits.hits"] ) scroll_id = props.get("_scroll_id") hits = props.get("hits.total.value", props.get("hits.total", 0)) hits_relation = props.get("hits.total.relation", "eq") timed_out = props.get("timed_out", False) took = props.get("took", 0) all_results_collected = (size is not None and hits < size) or hits == 0 else: # /_search/scroll does not accept request_cache so not providing params r = await es.perform_request( method="GET", path="/_search/scroll", body={"scroll_id": scroll_id, "scroll": "10s"}, params=None, headers=headers, ) props = parse(r, ["timed_out", "took"], ["hits.hits"]) timed_out = timed_out or props.get("timed_out", False) took += props.get("took", 0) # is the list of hits empty? all_results_collected = props.get("hits.hits", False) retrieved_pages += 1 if all_results_collected: break finally: if scroll_id: # noinspection PyBroadException try: await es.clear_scroll(body={"scroll_id": [scroll_id]}) except BaseException: self.logger.exception( "Could not clear scroll [%s]. This will lead to excessive resource usage in " "Elasticsearch and will skew your benchmark results.", scroll_id, ) return { "weight": retrieved_pages, "pages": retrieved_pages, "hits": hits, "hits_relation": hits_relation, "unit": "pages", "timed_out": timed_out, "took": took, } if operation_type == "paginated-search": return await _search_after_query(es, params) elif operation_type == "scroll-search": return await _scroll_query(es, params) elif operation_type == "composite-agg": return await _composite_agg(es, params) elif operation_type == "search": if "pages" in params: logging.getLogger(__name__).warning( "Invoking a scroll search with the 'search' operation is deprecated " "and will be removed in a future release. Use 'scroll-search' instead." ) return await _scroll_query(es, params) else: return await _request_body_query(es, params) else: raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]") async def _raw_search(self, es, doc_type, index, body, params, headers=None): components = [] if index: components.append(index) if doc_type: components.append(doc_type) components.append("_search") path = "/".join(components) return await es.perform_request(method="GET", path="/" + path, params=params, body=body, headers=headers) def _query_headers(self, params): # reduces overhead due to decompression of very large responses if params.get("response-compression-enabled", True): return None else: return {"Accept-Encoding": "identity"} def __repr__(self, *args, **kwargs): return "query" class SearchAfterExtractor: def __init__(self): # extracts e.g. '[1609780186, "2"]' from '"sort": [1609780186, "2"]' self.sort_pattern = re.compile(r"sort\":([^\]]*])") def __call__(self, response: BytesIO, get_point_in_time: bool, hits_total: Optional[int]) -> (dict, list): # not a class member as we would want to mutate over the course of execution for efficiency properties = ["timed_out", "took"] if get_point_in_time: properties.append("pit_id") # we only need to parse these the first time, subsequent responses should have the same values if hits_total is None: properties.extend(["hits.total", "hits.total.value", "hits.total.relation"]) parsed = parse(response, properties) if get_point_in_time and not parsed.get("pit_id"): raise exceptions.RallyAssertionError("Paginated query failure: pit_id was expected but not found in the response.") # standardize these before returning... parsed["hits.total.value"] = parsed.pop("hits.total.value", parsed.pop("hits.total", hits_total)) parsed["hits.total.relation"] = parsed.get("hits.total.relation", "eq") return parsed, self._get_last_sort(response) def _get_last_sort(self, response): """ Algorithm is based on findings from benchmarks/driver/parsing_test.py. Potentially a huge time sink if changed. """ response_str = response.getvalue().decode("UTF-8") index_of_last_sort = response_str.rfind('"sort"') last_sort_str = re.search(self.sort_pattern, response_str[index_of_last_sort::]) if last_sort_str is not None: return json.loads(last_sort_str.group(1)) else: return None class CompositeAggExtractor: def __call__(self, response: BytesIO, get_point_in_time: bool, path_to_composite_agg: list, hits_total: Optional[int]) -> dict: # not a class member as we would want to mutate over the course of execution for efficiency properties = ["timed_out", "took"] if get_point_in_time: properties.append("pit_id") # we only need to parse these the first time, subsequent responses should have the same values if hits_total is None: properties.extend(["hits.total", "hits.total.value", "hits.total.relation"]) after_key = "aggregations." + (".".join(path_to_composite_agg)) + ".after_key" # TODO remove the below ignore when introducing type hints parsed = parse(response, properties, None, [after_key]) # type: ignore[arg-type] if get_point_in_time and not parsed.get("pit_id"): raise exceptions.RallyAssertionError("Paginated query failure: pit_id was expected but not found in the response.") # standardize these before returning... parsed["hits.total.value"] = parsed.pop("hits.total.value", parsed.pop("hits.total", hits_total)) parsed["hits.total.relation"] = parsed.get("hits.total.relation", "eq") parsed["after_key"] = parsed.pop(after_key, None) return parsed class ClusterHealth(Runner): """ Get cluster health """ async def __call__(self, es, params): @total_ordering class ClusterHealthStatus(Enum): UNKNOWN = 0 RED = 1 YELLOW = 2 GREEN = 3 def __lt__(self, other): if self.__class__ is other.__class__: return self.value < other.value return NotImplemented def status(v): try: return ClusterHealthStatus[v.upper()] except (KeyError, AttributeError): return ClusterHealthStatus.UNKNOWN request_params = params.get("request-params", {}) api_kwargs = self._default_kw_params(params) # by default, Elasticsearch will not wait and thus we treat this as success expected_cluster_status = request_params.get("wait_for_status", str(ClusterHealthStatus.UNKNOWN)) if "wait_for_no_relocating_shards" in request_params: expected_relocating_shards = 0 else: # we're good with any count of relocating shards. expected_relocating_shards = sys.maxsize result = await es.cluster.health(**api_kwargs) cluster_status = result["status"] relocating_shards = result["relocating_shards"] result = { "weight": 1, "unit": "ops", "success": status(cluster_status) >= status(expected_cluster_status) and relocating_shards <= expected_relocating_shards, "cluster-status": cluster_status, "relocating-shards": relocating_shards, } self.logger.info( "%s: expected status=[%s], actual status=[%s], relocating shards=[%d], success=[%s].", repr(self), expected_cluster_status, cluster_status, relocating_shards, result["success"], ) return result def __repr__(self, *args, **kwargs): return "cluster-health" class PutPipeline(Runner): """ Execute the `put pipeline API <https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html>`_. """ async def __call__(self, es, params): await es.ingest.put_pipeline( id=mandatory(params, "id", self), body=mandatory(params, "body", self), master_timeout=params.get("master-timeout"), timeout=params.get("timeout"), ) def __repr__(self, *args, **kwargs): return "put-pipeline" class Refresh(Runner): """ Execute the `refresh API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html>`_. """ async def __call__(self, es, params): api_kwargs = self._default_kw_params(params) await es.indices.refresh(**api_kwargs) def __repr__(self, *args, **kwargs): return "refresh" class CreateIndex(Runner): """ Execute the `create index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html>`_. """ async def __call__(self, es, params): indices = mandatory(params, "indices", self) api_kwargs = self._default_kw_params(params) ## ignore invalid entries rather than erroring for term in ["index", "body"]: api_kwargs.pop(term, None) for index, body in indices: await es.indices.create(index=index, body=body, **api_kwargs) return { "weight": len(indices), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-index" class CreateDataStream(Runner): """ Execute the `create data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-data-stream.html>`_. """ async def __call__(self, es, params): data_streams = mandatory(params, "data-streams", self) request_params = mandatory(params, "request-params", self) for data_stream in data_streams: await es.indices.create_data_stream(name=data_stream, params=request_params) return { "weight": len(data_streams), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-data-stream" async def set_destructive_requires_name(es, value): """ Sets `action.destructive_requires_name` to provided value :return: the prior setting, if any """ all_settings = await es.cluster.get_settings(flat_settings=True) # If the setting was persistent or left as default, we consider resetting later with null sufficient prior_value = all_settings.get("transient").get("action.destructive_requires_name") settings_body = { "transient": { "action.destructive_requires_name": value, }, } await es.cluster.put_settings(body=settings_body) return prior_value class DeleteIndex(Runner): """ Execute the `delete index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html>`_. """ async def __call__(self, es, params): ops = 0 indices = mandatory(params, "indices", self) only_if_exists = params.get("only-if-exists", False) request_params = params.get("request-params", {}) # bypass cluster settings access for serverless prior_destructive_setting = None if not self.serverless_mode or self.serverless_operator: prior_destructive_setting = await set_destructive_requires_name(es, False) try: for index_name in indices: if not only_if_exists: await es.indices.delete(index=index_name, ignore=[404], params=request_params) ops += 1 elif only_if_exists and await es.indices.exists(index=index_name): self.logger.info("Index [%s] already exists. Deleting it.", index_name) await es.indices.delete(index=index_name, params=request_params) ops += 1 finally: if not self.serverless_mode or self.serverless_operator: await set_destructive_requires_name(es, prior_destructive_setting) return { "weight": ops, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-index" class DeleteDataStream(Runner): """ Execute the `delete data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-data-stream.html>`_. """ async def __call__(self, es, params): ops = 0 data_streams = mandatory(params, "data-streams", self) only_if_exists = mandatory(params, "only-if-exists", self) request_params = mandatory(params, "request-params", self) for data_stream in data_streams: if not only_if_exists: await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 elif only_if_exists and await es.indices.exists(index=data_stream): self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) await es.indices.delete_data_stream(name=data_stream, params=request_params) ops += 1 return { "weight": ops, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-data-stream" class CreateComponentTemplate(Runner): """ Execute the `PUT component template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-component-template.html>`_. """ async def __call__(self, es, params): templates = mandatory(params, "templates", self) request_params = mandatory(params, "request-params", self) for name, body in templates: await es.cluster.put_component_template(name=name, template=body["template"], params=request_params) return { "weight": len(templates), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-component-template" class DeleteComponentTemplate(Runner): """ Execute the `DELETE component template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-component-template.html>`_. """ async def __call__(self, es, params): template_names = mandatory(params, "templates", self) only_if_exists = mandatory(params, "only-if-exists", self) request_params = mandatory(params, "request-params", self) ops_count = 0 for template_name in template_names: if not only_if_exists: await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 elif only_if_exists and await es.cluster.exists_component_template(name=template_name): self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) await es.cluster.delete_component_template(name=template_name, params=request_params) ops_count += 1 return { "weight": ops_count, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-component-template" class CreateComposableTemplate(Runner): """ Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-template.html>`_. """ async def __call__(self, es, params): templates = mandatory(params, "templates", self) request_params = mandatory(params, "request-params", self) for template, body in templates: await es.indices.put_index_template(name=template, body=body, params=request_params) return { "weight": len(templates), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-composable-template" class DeleteComposableTemplate(Runner): """ Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-template.html>`_. """ async def __call__(self, es, params): templates = mandatory(params, "templates", self) only_if_exists = mandatory(params, "only-if-exists", self) request_params = mandatory(params, "request-params", self) ops_count = 0 prior_destructive_setting = None current_destructive_setting = None try: for template_name, delete_matching_indices, index_pattern in templates: if not only_if_exists: await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 elif only_if_exists and await es.indices.exists_index_template(name=template_name): self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_index_template(name=template_name, params=request_params) ops_count += 1 # 1. Ignore delete matching indices in serverless as wildcard deletes are not supported # 2. Ensure that we do not provide an empty index pattern by accident if not self.serverless_mode or self.serverless_operator: if delete_matching_indices and index_pattern: # only set if really required if current_destructive_setting is None: current_destructive_setting = False prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting) ops_count += 1 await es.indices.delete(index=index_pattern) ops_count += 1 finally: if current_destructive_setting is not None: await set_destructive_requires_name(es, prior_destructive_setting) ops_count += 1 return { "weight": ops_count, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-composable-template" class CreateIndexTemplate(Runner): """ Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html>`_. """ async def __call__(self, es, params): templates = mandatory(params, "templates", self) request_params = params.get("request-params", {}) for template, body in templates: await es.indices.put_template(name=template, body=body, params=request_params) return { "weight": len(templates), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-index-template" class DeleteIndexTemplate(Runner): """ Execute the `delete index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html#delete>`_. """ async def __call__(self, es, params): template_names = mandatory(params, "templates", self) only_if_exists = params.get("only-if-exists", False) request_params = params.get("request-params", {}) ops_count = 0 prior_destructive_setting = None current_destructive_setting = None try: for template_name, delete_matching_indices, index_pattern in template_names: if not only_if_exists: await es.indices.delete_template(name=template_name, ignore=[404], params=request_params) ops_count += 1 elif only_if_exists and await es.indices.exists_template(name=template_name): self.logger.info("Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: # only set if really required if current_destructive_setting is None: current_destructive_setting = False prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting) ops_count += 1 await es.indices.delete(index=index_pattern) ops_count += 1 finally: if current_destructive_setting is not None: await set_destructive_requires_name(es, prior_destructive_setting) ops_count += 1 return { "weight": ops_count, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-index-template" class ShrinkIndex(Runner): """ Execute the `shrink index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html>`_. This is a high-level runner that actually executes multiple low-level operations under the hood. """ def __init__(self): super().__init__() self.cluster_health = Retry(ClusterHealth()) async def _wait_for(self, es, idx, description): # wait a little bit before the first check await asyncio.sleep(3) result = await self.cluster_health( es, params={"index": idx, "retries": sys.maxsize, "request-params": {"wait_for_no_relocating_shards": "true"}} ) if not result["success"]: raise exceptions.RallyAssertionError(f"Failed to wait for [{description}].") async def __call__(self, es, params): source_index = mandatory(params, "source-index", self) source_indices_get = await es.indices.get(index=source_index) source_indices = list(source_indices_get.keys()) source_indices_stem = commonprefix(source_indices) target_index = mandatory(params, "target-index", self) # we need to inject additional settings so we better copy the body target_body = deepcopy(mandatory(params, "target-body", self)) shrink_node = params.get("shrink-node") # Choose a random data node if none is specified if shrink_node: node_names = [shrink_node] else: node_names = [] # choose a random data node node_info = await es.nodes.info() for node in node_info["nodes"].values(): if "data" in node["roles"]: node_names.append(node["name"]) if not node_names: raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Specify it explicitly.") for source_index in source_indices: shrink_node = random.choice(node_names) self.logger.info("Using [%s] as shrink node.", shrink_node) self.logger.info("Preparing [%s] for shrinking.", source_index) # prepare index for shrinking await es.indices.put_settings( index=source_index, body={"settings": {"index.routing.allocation.require._name": shrink_node, "index.blocks.write": "true"}}, preserve_existing=True, ) self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index) await self._wait_for(es, source_index, f"shard relocation for index [{source_index}]") self.logger.info("Shrinking [%s] to [%s].", source_index, target_index) if "settings" not in target_body: target_body["settings"] = {} target_body["settings"]["index.routing.allocation.require._name"] = None target_body["settings"]["index.blocks.write"] = None # kick off the shrink operation index_suffix = source_index.removeprefix(source_indices_stem) final_target_index = target_index if len(index_suffix) == 0 else target_index + index_suffix await es.indices.shrink(index=source_index, target=final_target_index, body=target_body) self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index) await self._wait_for(es, final_target_index, f"shrink for index [{final_target_index}]") self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, final_target_index) # ops_count is not really important for this operation... return { "weight": len(source_indices), "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "shrink-index" class CreateMlDatafeed(Runner): """ Execute the `create datafeed API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch datafeed_id = mandatory(params, "datafeed-id", self) body = mandatory(params, "body", self) try: await es.ml.put_datafeed(datafeed_id=datafeed_id, body=body) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="PUT", path=f"/_xpack/ml/datafeeds/{datafeed_id}", body=body, ) def __repr__(self, *args, **kwargs): return "create-ml-datafeed" class DeleteMlDatafeed(Runner): """ Execute the `delete datafeed API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-datafeed.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch datafeed_id = mandatory(params, "datafeed-id", self) force = params.get("force", False) try: # we don't want to fail if a datafeed does not exist, thus we ignore 404s. await es.ml.delete_datafeed(datafeed_id=datafeed_id, force=force, ignore=[404]) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="DELETE", path=f"/_xpack/ml/datafeeds/{datafeed_id}", params={"force": escape(force), "ignore": 404}, ) def __repr__(self, *args, **kwargs): return "delete-ml-datafeed" class StartMlDatafeed(Runner): """ Execute the `start datafeed API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch datafeed_id = mandatory(params, "datafeed-id", self) body = params.get("body") start = params.get("start") end = params.get("end") timeout = params.get("timeout") try: await es.ml.start_datafeed(datafeed_id=datafeed_id, body=body, start=start, end=end, timeout=timeout) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="POST", path=f"/_xpack/ml/datafeeds/{datafeed_id}/_start", body=body, ) def __repr__(self, *args, **kwargs): return "start-ml-datafeed" class StopMlDatafeed(Runner): """ Execute the `stop datafeed API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-stop-datafeed.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch datafeed_id = mandatory(params, "datafeed-id", self) force = params.get("force", False) timeout = params.get("timeout") try: await es.ml.stop_datafeed(datafeed_id=datafeed_id, force=force, timeout=timeout) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 request_params = { "force": escape(force), } if timeout: request_params["timeout"] = escape(timeout) await es.perform_request( method="POST", path=f"/_xpack/ml/datafeeds/{datafeed_id}/_stop", params=request_params, ) def __repr__(self, *args, **kwargs): return "stop-ml-datafeed" class CreateMlJob(Runner): """ Execute the `create job API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-job.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch job_id = mandatory(params, "job-id", self) body = mandatory(params, "body", self) try: await es.ml.put_job(job_id=job_id, body=body) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="PUT", path=f"/_xpack/ml/anomaly_detectors/{job_id}", body=body, ) def __repr__(self, *args, **kwargs): return "create-ml-job" class DeleteMlJob(Runner): """ Execute the `delete job API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-job.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch job_id = mandatory(params, "job-id", self) force = params.get("force", False) # we don't want to fail if a job does not exist, thus we ignore 404s. try: await es.ml.delete_job(job_id=job_id, force=force, ignore=[404]) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="DELETE", path=f"/_xpack/ml/anomaly_detectors/{job_id}", params={"force": escape(force), "ignore": 404}, ) def __repr__(self, *args, **kwargs): return "delete-ml-job" class OpenMlJob(Runner): """ Execute the `open job API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-open-job.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch job_id = mandatory(params, "job-id", self) try: await es.ml.open_job(job_id=job_id) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 await es.perform_request( method="POST", path=f"/_xpack/ml/anomaly_detectors/{job_id}/_open", ) def __repr__(self, *args, **kwargs): return "open-ml-job" class CloseMlJob(Runner): """ Execute the `close job API <http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-close-job.html>`_. """ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch job_id = mandatory(params, "job-id", self) force = params.get("force", False) timeout = params.get("timeout") try: await es.ml.close_job(job_id=job_id, force=force, timeout=timeout) except elasticsearch.BadRequestError: # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 request_params = { "force": escape(force), } if timeout: request_params["timeout"] = escape(timeout) await es.perform_request( method="POST", path=f"/_xpack/ml/anomaly_detectors/{job_id}/_close", params=request_params, ) def __repr__(self, *args, **kwargs): return "close-ml-job" class RawRequest(Runner): async def __call__(self, es, params): params, request_params, transport_params, headers = self._transport_request_params(params) es = es.options(**transport_params) path = mandatory(params, "path", self) if not path.startswith("/"): self.logger.error("RawRequest failed. Path parameter: [%s] must begin with a '/'.", path) raise exceptions.RallyAssertionError(f"RawRequest [{path}] failed. Path parameter must begin with a '/'.") if not bool(headers): # counter-intuitive, but preserves prior behavior headers = None # disable eager response parsing - responses might be huge thus skewing results es.return_raw_response() await es.perform_request( method=params.get("method", "GET"), path=path, headers=headers, body=params.get("body"), params=request_params ) def __repr__(self, *args, **kwargs): return "raw-request" class Sleep(Runner): """ Sleeps for the specified duration not issuing any request. """ async def __call__(self, es, params): es.on_request_start() try: await asyncio.sleep(mandatory(params, "duration", "sleep")) finally: es.on_request_end() def __repr__(self, *args, **kwargs): return "sleep" class DeleteSnapshotRepository(Runner): """ Deletes a snapshot repository """ async def __call__(self, es, params): await es.snapshot.delete_repository(repository=mandatory(params, "repository", repr(self)), ignore=[404]) def __repr__(self, *args, **kwargs): return "delete-snapshot-repository" class CreateSnapshotRepository(Runner): """ Creates a new snapshot repository """ async def __call__(self, es, params): request_params = params.get("request-params", {}) await es.snapshot.create_repository( name=mandatory(params, "repository", repr(self)), body=mandatory(params, "body", repr(self)), params=request_params ) def __repr__(self, *args, **kwargs): return "create-snapshot-repository" class CreateSnapshot(Runner): """ Creates a new snapshot repository """ async def __call__(self, es, params): wait_for_completion = params.get("wait-for-completion", False) repository = mandatory(params, "repository", repr(self)) snapshot = mandatory(params, "snapshot", repr(self)) # just assert, gets set in _default_kw_params mandatory(params, "body", repr(self)) api_kwargs = self._default_kw_params(params) await es.snapshot.create(repository=repository, snapshot=snapshot, wait_for_completion=wait_for_completion, **api_kwargs) def __repr__(self, *args, **kwargs): return "create-snapshot" class WaitForSnapshotCreate(Runner): """ Waits until a currently running <snapshot> on a given repository has finished successfully and returns detailed metrics. """ async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) snapshot = mandatory(params, "snapshot", repr(self)) wait_period = params.get("completion-recheck-wait-period", 1) snapshot_done = False stats = {} while not snapshot_done: response = await es.snapshot.get(repository=repository, snapshot="_current", verbose=False) if snapshot in [s.get("snapshot") for s in response.get("snapshots", [])]: await asyncio.sleep(wait_period) continue response = await es.snapshot.status(repository=repository, snapshot=snapshot, ignore_unavailable=True) if "snapshots" in response: response_state = response["snapshots"][0]["state"] # Possible states: # https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html#get-snapshot-status-api-response-body if response_state == "FAILED": self.logger.error("Snapshot [%s] failed. Response:\n%s", snapshot, json.dumps(response, indent=2)) raise exceptions.RallyAssertionError(f"Snapshot [{snapshot}] failed. Please check logs.") snapshot_done = response_state == "SUCCESS" stats = response["snapshots"][0]["stats"] if not snapshot_done: await asyncio.sleep(wait_period) size = stats["total"]["size_in_bytes"] file_count = stats["total"]["file_count"] start_time_in_millis = stats["start_time_in_millis"] duration_in_millis = stats["time_in_millis"] duration_in_seconds = duration_in_millis / 1000 return { "weight": size, "unit": "byte", "success": True, "throughput": size / duration_in_seconds, "start_time_millis": start_time_in_millis, "stop_time_millis": start_time_in_millis + duration_in_millis, "duration": duration_in_millis, "file_count": file_count, } def __repr__(self, *args, **kwargs): return "wait-for-snapshot-create" class WaitForCurrentSnapshotsCreate(Runner): """ Waits until all currently running snapshots on a given repository have completed """ async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() es_version = es_info["version"].get("number", "8.3.0") request_args = {"repository": repository, "snapshot": "_current", "verbose": False} # significantly reduce response size when lots of snapshots have been taken # only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269) if (Version.from_string(es_version) >= Version.from_string("8.3.0")) or es.is_serverless: request_args["index_names"] = False while True: response = await es.snapshot.get(**request_args) if int(response.get("total")) == 0: break await asyncio.sleep(wait_period) # getting detailed stats per snapshot using the snapshot status api can be very expensive. # return nothing and rely on Rally's own service_time measurement for the duration. def __repr__(self, *args, **kwargs): return "wait-for-current-snapshots-create" class RestoreSnapshot(Runner): """ Restores a snapshot from an already registered repository """ async def __call__(self, es, params): params, request_params, transport_params, headers = self._transport_request_params(params) es = es.options(**transport_params) wait_for_completion = params.get("wait-for-completion", False) request_params.update({"wait_for_completion": wait_for_completion}) repo = mandatory(params, "repository", repr(self)) snapshot = mandatory(params, "snapshot", repr(self)) # TODO: Replace 'perform_request' with 'SnapshotClient.restore()' when https://github.com/elastic/elasticsearch-py/issues/2168 # is fixed await es.perform_request( method="POST", path=f"/_snapshot/{repo}/{snapshot}/_restore", headers=headers, body=params.get("body", {}), params=request_params, ) def __repr__(self, *args, **kwargs): return "restore-snapshot" class IndicesRecovery(Runner): async def __call__(self, es, params): index = mandatory(params, "index", repr(self)) wait_period = params.get("completion-recheck-wait-period", 1) all_shards_done = False total_recovered = 0 total_start_millis = sys.maxsize total_end_millis = 0 # wait until recovery is done # The nesting level is ok here given the structure of the API response # pylint: disable=too-many-nested-blocks while not all_shards_done: response = await es.indices.recovery(index=index) # This might happen if we happen to call the API before the next recovery is scheduled. if not response: self.logger.debug("Empty index recovery response for [%s].", index) else: # check whether all shards are done all_shards_done = True total_recovered = 0 total_start_millis = sys.maxsize total_end_millis = 0 for _, idx_data in response.items(): for _, shard_data in idx_data.items(): for shard in shard_data: current_shard_done = shard["stage"] == "DONE" all_shards_done = all_shards_done and current_shard_done if current_shard_done: total_start_millis = min(total_start_millis, shard["start_time_in_millis"]) total_end_millis = max(total_end_millis, shard["stop_time_in_millis"]) idx_size = shard["index"]["size"] total_recovered += idx_size["recovered_in_bytes"] self.logger.debug("All shards done for [%s]: [%s].", index, all_shards_done) if not all_shards_done: await asyncio.sleep(wait_period) response_time_in_seconds = (total_end_millis - total_start_millis) / 1000 return { "weight": total_recovered, "unit": "byte", "success": True, "throughput": total_recovered / response_time_in_seconds, "start_time_millis": total_start_millis, "stop_time_millis": total_end_millis, } def __repr__(self, *args, **kwargs): return "wait-for-recovery" class PutSettings(Runner): """ Updates cluster settings with the `cluster settings API <http://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html>_. """ async def __call__(self, es, params): await es.cluster.put_settings(body=mandatory(params, "body", repr(self))) def __repr__(self, *args, **kwargs): return "put-settings" class CreateTransform(Runner): """ Execute the `create transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/put-transform.html`_. """ async def __call__(self, es, params): transform_id = mandatory(params, "transform-id", self) body = mandatory(params, "body", self) defer_validation = params.get("defer-validation", False) await es.transform.put_transform(transform_id=transform_id, body=body, defer_validation=defer_validation) def __repr__(self, *args, **kwargs): return "create-transform" class StartTransform(Runner): """ Execute the `start transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/start-transform.html`_. """ async def __call__(self, es, params): transform_id = mandatory(params, "transform-id", self) timeout = params.get("timeout") await es.transform.start_transform(transform_id=transform_id, timeout=timeout) def __repr__(self, *args, **kwargs): return "start-transform" class WaitForTransform(Runner): """ Wait for the transform until it reaches a certain checkpoint. """ def __init__(self): super().__init__() self._completed = False self._percent_completed = 0.0 self._start_time = None self._last_documents_processed = 0 self._last_processing_time = 0 @property def completed(self): return self._completed @property def percent_completed(self): return self._percent_completed async def __call__(self, es, params): """ stop the transform and wait until transform has finished return stats :param es: The Elasticsearch client. :param params: A hash with all parameters. See below for details. :return: A hash with stats from the run. Different to the `stop transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html`_ this command will wait until the transform is stopped and a checkpoint has been reached. It expects a parameter dict with the following mandatory keys: * ``transform-id``: the transform id to start, the transform must have been created upfront. The following keys are optional: * ``force``: forcefully stop a transform, default false * ``wait-for-checkpoint``: whether to wait until all data has been processed till the next checkpoint, default true * ``wait-for-completion``: whether to block until the transform has stopped, default true * ``transform-timeout``: overall runtime timeout of the transform in seconds, default 3600 (1h) * ``poll-interval``: how often transform stats are polled, used to set progress and check the state, default 0.5. """ transform_id = mandatory(params, "transform-id", self) force = params.get("force", False) timeout = params.get("timeout") wait_for_completion = params.get("wait-for-completion", True) wait_for_checkpoint = params.get("wait-for-checkpoint", True) transform_timeout = params.get("transform-timeout", 60.0 * 60.0) poll_interval = params.get("poll-interval", 0.5) if not self._start_time: self._start_time = time.monotonic() await es.transform.stop_transform( transform_id=transform_id, force=force, timeout=timeout, wait_for_completion=False, wait_for_checkpoint=wait_for_checkpoint ) while True: stats_response = await es.transform.get_transform_stats(transform_id=transform_id) state = stats_response["transforms"][0].get("state") transform_stats = stats_response["transforms"][0].get("stats", {}) if (time.monotonic() - self._start_time) > transform_timeout: raise exceptions.RallyAssertionError( f"Transform [{transform_id}] timed out after [{transform_timeout}] seconds. " "Please consider increasing the timeout in the track." ) if state == "failed": failure_reason = stats_response["transforms"][0].get("reason", "unknown") raise exceptions.RallyAssertionError(f"Transform [{transform_id}] failed with [{failure_reason}].") if state == "stopped" or wait_for_completion is False: self._completed = True self._percent_completed = 1.0 else: self._percent_completed = ( stats_response["transforms"][0] .get("checkpointing", {}) .get("next", {}) .get("checkpoint_progress", {}) .get("percent_complete", 0.0) / 100.0 ) documents_processed = transform_stats.get("documents_processed", 0) processing_time = transform_stats.get("search_time_in_ms", 0) processing_time += transform_stats.get("processing_time_in_ms", 0) processing_time += transform_stats.get("index_time_in_ms", 0) documents_processed_delta = documents_processed - self._last_documents_processed processing_time_delta = processing_time - self._last_processing_time # only report if we have enough data or transform has completed if self._completed or (documents_processed_delta > 5000 and processing_time_delta > 500): stats = { "transform-id": transform_id, "weight": transform_stats.get("documents_processed", 0), "unit": "docs", "success": True, } throughput = 0 if self._completed: # take the overall throughput if processing_time > 0: throughput = documents_processed / processing_time * 1000 elif processing_time_delta > 0: throughput = documents_processed_delta / processing_time_delta * 1000 stats["throughput"] = throughput self._last_documents_processed = documents_processed self._last_processing_time = processing_time return stats else: # sleep for a while, so stats is not called to often await asyncio.sleep(poll_interval) def __repr__(self, *args, **kwargs): return "wait-for-transform" class DeleteTransform(Runner): """ Execute the `delete transform API https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-transform.html`_. """ async def __call__(self, es, params): transform_id = mandatory(params, "transform-id", self) force = params.get("force", False) # we don't want to fail if a job does not exist, thus we ignore 404s. await es.transform.delete_transform(transform_id=transform_id, force=force, ignore=[404]) def __repr__(self, *args, **kwargs): return "delete-transform" class TransformStats(Runner): """ Gather index stats for one or all transforms. """ def _get(self, v, path): if v is None: return None elif len(path) == 1: return v.get(path[0]) else: return self._get(v.get(path[0]), path[1:]) def _safe_string(self, v): return str(v) if v is not None else None async def __call__(self, es, params): api_kwargs = self._default_kw_params(params) transform_id = mandatory(params, "transform-id", self) condition = params.get("condition") response = await es.transform.get_transform_stats(transform_id=transform_id, **api_kwargs) transforms = response.get("transforms", []) transform_stats = transforms[0] if len(transforms) > 0 else {} if condition: path = mandatory(condition, "path", repr(self)) expected_value = mandatory(condition, "expected-value", repr(self)) actual_value = self._get(transform_stats, path.split(".")) return { "weight": 1, "unit": "ops", "condition": { "path": path, # avoid mapping issues in the ES metrics store by always rendering values as strings "actual-value": self._safe_string(actual_value), "expected-value": self._safe_string(expected_value), }, # currently we only support "==" as a predicate but that might change in the future "success": actual_value == expected_value, } else: return { "weight": 1, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "transform-stats" class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) # defaults wait_for_completion_timeout = 0 to avoid sync fallback for fast searches if "wait_for_completion_timeout" not in request_params: request_params["wait_for_completion_timeout"] = 0 response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) search_id = response.get("id") CompositeContext.put(op_name, search_id) def __repr__(self, *args, **kwargs): return "submit-async-search" def async_search_ids(op_names): subjects = [op_names] if isinstance(op_names, str) else op_names for subject in subjects: subject_id = CompositeContext.get(subject) if subject_id: yield subject_id, subject class GetAsyncSearch(Runner): async def __call__(self, es, params): success = True searches = mandatory(params, "retrieve-results-for", self) request_params = params.get("request-params", {}) stats = {} for search_id, search in async_search_ids(searches): response = await es.async_search.get(id=search_id, params=request_params) is_running = response["is_running"] success = success and not is_running if not is_running: stats[search] = { "timed_out": response["response"]["timed_out"], "took": response["response"]["took"], } if "total" in response["response"]["hits"].keys(): stats[search]["hits"] = response["response"]["hits"]["total"]["value"] stats[search]["hits_relation"] = response["response"]["hits"]["total"]["relation"] return { # only count completed searches - there is one key per search id in `stats` "weight": len(stats), "unit": "ops", "success": success, "stats": stats, } def __repr__(self, *args, **kwargs): return "get-async-search" class DeleteAsyncSearch(Runner): async def __call__(self, es, params): searches = mandatory(params, "delete-results-for", self) for search_id, search in async_search_ids(searches): await es.async_search.delete(id=search_id) CompositeContext.remove(search) def __repr__(self, *args, **kwargs): return "delete-async-search" class OpenPointInTime(Runner): async def __call__(self, es, params): op_name = mandatory(params, "name", self) index = mandatory(params, "index", self) keep_alive = params.get("keep-alive", "1m") response = await es.open_point_in_time(index=index, params=params.get("request-params"), keep_alive=keep_alive) id = response.get("id") CompositeContext.put(op_name, id) def __repr__(self, *args, **kwargs): return "open-point-in-time" class ClosePointInTime(Runner): async def __call__(self, es, params): pit_op = mandatory(params, "with-point-in-time-from", self) pit_id = CompositeContext.get(pit_op) request_params = params.get("request-params", {}) body = {"id": pit_id} await es.close_point_in_time(body=body, params=request_params, headers=None) CompositeContext.remove(pit_op) def __repr__(self, *args, **kwargs): return "close-point-in-time" class CompositeContext: ctx = contextvars.ContextVar("composite_context") def __init__(self): self.token = None async def __aenter__(self): self.token = CompositeContext.ctx.set({}) return self async def __aexit__(self, exc_type, exc_val, exc_tb): CompositeContext.ctx.reset(self.token) # type: ignore[arg-type] # TODO remove this ignore when introducing type hints return False @staticmethod def put(key, value): CompositeContext._ctx()[key] = value @staticmethod def get(key): try: return CompositeContext._ctx()[key] except KeyError: raise KeyError( f"Unknown property [{key}]. Currently recognized properties are [{', '.join(CompositeContext._ctx().keys())}]." ) from None @staticmethod def remove(key): try: CompositeContext._ctx().pop(key) except KeyError: raise KeyError( f"Unknown property [{key}]. Currently recognized properties are [{', '.join(CompositeContext._ctx().keys())}]." ) from None @staticmethod def _ctx(): try: return CompositeContext.ctx.get() except LookupError: raise exceptions.RallyAssertionError("This operation is only allowed inside a composite operation.") from None class Composite(Runner): """ Executes a complex request structure which is measured by Rally as one composite operation. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Since Composite is marked as serverless.Status.Public, only add public # operation types here. self.supported_op_types = [ "open-point-in-time", "close-point-in-time", "search", "paginated-search", "composite-agg", "raw-request", "sleep", "submit-async-search", "get-async-search", "delete-async-search", "field-caps", ] async def run_stream(self, es, stream, connection_limit): streams = [] timings = [] try: for item in stream: if "stream" in item: streams.append(asyncio.create_task(self.run_stream(es, item["stream"], connection_limit))) elif "operation-type" in item: # consume all prior streams first if streams: streams_timings = await asyncio.gather(*streams) for stream_timings in streams_timings: timings += stream_timings streams = [] op_type = item["operation-type"] if op_type not in self.supported_op_types: raise exceptions.RallyAssertionError( f"Unsupported operation-type [{op_type}]. Use one of [{', '.join(self.supported_op_types)}]." ) runner = RequestTiming(runner_for(op_type)) async with connection_limit: async with runner: response = await runner({"default": es}, item) if response: # TODO: support calculating dependent's throughput # drop weight and unit metadata but keep the rest response.pop("weight") response.pop("unit") timing = response.get("dependent_timing") if timing: timings.append(response) else: timings.append(None) else: raise exceptions.RallyAssertionError("Requests structure must contain [stream] or [operation-type].") except BaseException: # stop all already created tasks in case of exceptions for s in streams: if not s.done(): s.cancel() raise # complete any outstanding streams if streams: streams_timings = await asyncio.gather(*streams) for stream_timings in streams_timings: timings += stream_timings return timings async def __call__(self, es, params): requests = mandatory(params, "requests", self) max_connections = params.get("max-connections", sys.maxsize) async with CompositeContext(): response = await self.run_stream(es, requests, asyncio.BoundedSemaphore(max_connections)) return { "weight": 1, "unit": "ops", "dependent_timing": response, } def __repr__(self, *args, **kwargs): return "composite" class CreateIlmPolicy(Runner): """ Execute the `PUT index lifecycle policy API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ilm-put-lifecycle.html>`_. """ async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) body = mandatory(params, "body", self) policy = body.get("policy", {}) if not policy: # The es client automatically inserts the runner's 'body' within a top level a 'policy' field, so if a user # provides a 'body' missing the 'policy' field, the request fails with a misleading exception message, so # let's raise a more helpful error message. raise exceptions.DataError( "Request body does not contain the expected root field [policy]. Please ensure that the request body contains " "a top-level 'policy' field and try again." ) request_params = params.get("request-params", {}) error_trace = request_params.get("error_trace", None) filter_path = request_params.get("filter_path", None) master_timeout = request_params.get("master_timeout", None) timeout = request_params.get("timeout", None) await es.ilm.put_lifecycle( name=policy_name, policy=policy, error_trace=error_trace, filter_path=filter_path, master_timeout=master_timeout, timeout=timeout, ) return { "weight": 1, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "create-ilm-policy" class DeleteIlmPolicy(Runner): """ Execute the `DELETE index lifecycle policy API <https://www.elastic.co/guide/en/elasticsearch/reference/current/ilm-delete-lifecycle.html>`_. """ async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) request_params = params.get("request-params", {}) error_trace = request_params.get("error_trace", None) filter_path = request_params.get("filter_path", None) master_timeout = request_params.get("master_timeout", None) timeout = request_params.get("timeout", None) await es.ilm.delete_lifecycle( name=policy_name, error_trace=error_trace, filter_path=filter_path, master_timeout=master_timeout, timeout=timeout, ignore=[404] ) return { "weight": 1, "unit": "ops", "success": True, } def __repr__(self, *args, **kwargs): return "delete-ilm-policy" class Sql(Runner): """ Executes an SQL query and optionally paginates through subsequent pages. """ async def __call__(self, es, params): body = mandatory(params, "body", self) if body.get("query") is None: raise exceptions.DataError( "Parameter source for operation 'sql' did not provide the mandatory parameter 'body.query'. " "Add it to your parameter source and try again." ) pages = params.get("pages", 1) es.return_raw_response() r = await es.perform_request(method="POST", path="/_sql", body=body) pages -= 1 weight = 1 while pages > 0: cursor = parse(r, ["cursor"]).get("cursor") if not cursor: raise exceptions.DataError(f"Result set has been exhausted before all pages have been fetched, {pages} page(s) remaining.") r = await es.perform_request(method="POST", path="/_sql", body={"cursor": cursor}) pages -= 1 weight += 1 return {"weight": weight, "unit": "ops", "success": True} def __repr__(self, *args, **kwargs): return "sql" class Downsample(Runner): """ Executes a downsampling operation creating the target index and aggregating data in the source index on the @timestamp field. """ async def __call__(self, es, params): params, request_params, transport_params, request_headers = self._transport_request_params(params) es = es.options(**transport_params) fixed_interval = mandatory(params, "fixed-interval", self) if fixed_interval is None: raise exceptions.DataError( "Parameter source for operation 'downsample' did not provide the mandatory parameter 'fixed-interval'. " "Add it to your parameter source and try again." ) source_index = mandatory(params, "source-index", self) if source_index is None: raise exceptions.DataError( "Parameter source for operation 'downsample' did not provide the mandatory parameter 'source-index'. " "Add it to your parameter source and try again." ) target_index = mandatory(params, "target-index", self) if target_index is None: raise exceptions.DataError( "Parameter source for operation 'downsample' did not provide the mandatory parameter 'target-index'. " "Add it to your parameter source and try again." ) path = f"/{source_index}/_downsample/{target_index}" await es.perform_request( method="POST", path=path, body={"fixed_interval": fixed_interval}, params=request_params, headers=request_headers ) return {"weight": 1, "unit": "ops", "success": True} def __repr__(self, *args, **kwargs): return "downsample" class FieldCaps(Runner): """ Retrieve `the capabilities of fields among indices. <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-field-caps.html>` _. """ async def __call__(self, es, params): index = params.get("index", "_all") fields = params.get("fields", "*") body = params.get("body", {}) index_filter = params.get("index_filter") if index_filter: body["index_filter"] = index_filter request_params = params.get("request-params") await es.field_caps(index=index, body=body, fields=fields, params=request_params) return {"weight": 1, "unit": "ops", "success": True} def __repr__(self, *args, **kwargs): return "field-caps" class Esql(Runner): async def __call__(self, es, params): params, request_params, transport_params, headers = self._transport_request_params(params) es = es.options(**transport_params) query = mandatory(params, "query", self) body = params.get("body", {}) body["query"] = query query_filter = params.get("filter") if query_filter: body["filter"] = query_filter if not bool(headers): # counter-intuitive, but preserves prior behavior headers = None # disable eager response parsing - responses might be huge thus skewing results es.return_raw_response() await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params) return {"success": True, "unit": "ops", "weight": 1} def __repr__(self, *args, **kwargs): return "esql" class RequestTiming(Runner, Delegator): def __init__(self, delegate): super().__init__(delegate=delegate) async def __aenter__(self): await self.delegate.__aenter__() return self async def __call__(self, es, params): absolute_time = time.time() with es["default"].new_request_context() as request_context: return_value = await self.delegate(es, params) if isinstance(return_value, tuple) and len(return_value) == 2: total_ops, total_ops_unit = return_value result = { "weight": total_ops, "unit": total_ops_unit, "success": True, } elif isinstance(return_value, dict): result = return_value else: result = { "weight": 1, "unit": "ops", "success": True, } start = request_context.request_start end = request_context.request_end result["dependent_timing"] = { "operation": params.get("name"), "operation-type": params.get("operation-type"), "absolute_time": absolute_time, "request_start": start, "request_end": end, "service_time": end - start, } return result async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) # TODO: Allow to use this from (selected) regular runners and add user documentation. # TODO: It would maybe be interesting to add meta-data on how many retries there were. class Retry(Runner, Delegator): """ This runner can be used as a wrapper around regular runners to retry operations. It defines the following parameters: * ``retries`` (optional, default 0): The number of times the operation is retried. * ``retry-until-success`` (optional, default False): Retries until the delegate returns a success. This will also forcibly set ``retry-on-error`` to ``True``. * ``retry-wait-period`` (optional, default 0.5): The time in seconds to wait after an error. * ``retry-on-timeout`` (optional, default True): Whether to retry on connection timeout. * ``retry-on-error`` (optional, default False): Whether to retry on failure (i.e. the delegate returns ``success == False``) """ def __init__(self, delegate, retry_until_success=False): super().__init__(delegate=delegate) self.retry_until_success = retry_until_success async def __aenter__(self): await self.delegate.__aenter__() return self async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import socket import elasticsearch retry_until_success = params.get("retry-until-success", self.retry_until_success) if retry_until_success: max_attempts = sys.maxsize retry_on_error = True else: max_attempts = params.get("retries", 0) + 1 retry_on_error = params.get("retry-on-error", False) sleep_time = params.get("retry-wait-period", 0.5) retry_on_timeout = params.get("retry-on-timeout", True) for attempt in range(max_attempts): last_attempt = attempt + 1 == max_attempts try: return_value = await self.delegate(es, params) if last_attempt or not retry_on_error: return return_value # we can determine success if and only if the runner returns a dict. Otherwise, we have to assume it was fine. elif isinstance(return_value, dict): if return_value.get("success", True): self.logger.debug("%s has returned successfully", repr(self.delegate)) return return_value else: self.logger.info( "[%s] has returned with an error: %s. Retrying in [%.2f] seconds.", repr(self.delegate), return_value, sleep_time, ) await asyncio.sleep(sleep_time) else: return return_value except (socket.timeout, elasticsearch.exceptions.ConnectionError): if last_attempt or not retry_on_timeout: raise await asyncio.sleep(sleep_time) except elasticsearch.ApiError as e: if last_attempt or not retry_on_timeout: raise e if e.status_code == 408: self.logger.info("[%s] has timed out. Retrying in [%.2f] seconds.", repr(self.delegate), sleep_time) await asyncio.sleep(sleep_time) else: raise e except elasticsearch.exceptions.ConnectionTimeout as e: if last_attempt or not retry_on_timeout: raise e self.logger.info("[%s] has timed out. Retrying in [%.2f] seconds.", repr(self.delegate), sleep_time) await asyncio.sleep(sleep_time) except elasticsearch.exceptions.TransportError as e: if last_attempt or not retry_on_timeout: raise e async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) def __repr__(self, *args, **kwargs): return "retryable %s" % repr(self.delegate)