http_logs/track.py (49 lines of code) (raw):

import re from copy import copy from esrally import exceptions from esrally.track import loader def reindex(es, params): result = es.reindex(body=params.get("body"), request_timeout=params.get("request_timeout")) return result["total"], "docs" async def reindex_async(es, params): result = await es.reindex(body=params.get("body"), request_timeout=params.get("request_timeout")) return result["total"], "docs" class RuntimeFieldResolver(loader.TrackProcessor): PATTERN = re.compile(".+-from-(.+)-using-(.+)") def on_after_load_track(self, t): for challenge in t.challenges: for task in challenge.schedule: m = self.PATTERN.match(task.name) if m is not None: source = m[1] impl = m[2].replace("-", "_") task.operation = copy(task.operation) task.operation.params = self._replace_field(f"{impl}.from_{source}.", task.operation.params) def on_prepare_track(self, track, data_root_dir): # TODO remove this backwards compatibility hatch after several Rally releases # ref: https://github.com/elastic/rally/pull/1228 and https://github.com/elastic/rally/issues/1166 class EmptyTrueList(list): def __bool__(self): return True def __eq__(self, other): if isinstance(other, bool): return True return EmptyTrueList() def _replace_field(self, field, t): if t == "path" or t == "status": return field + t if isinstance(t, list): return [self._replace_field(field, v) for v in t] if isinstance(t, dict): return {self._replace_field(field, k): self._replace_field(field, v) for k, v in t.items()} return t def register(registry): async_runner = registry.meta_data.get("async_runner", False) if async_runner: registry.register_runner("reindex", reindex_async, async_runner=True) else: registry.register_runner("reindex", reindex) registry.register_track_processor(RuntimeFieldResolver()) # TODO change this based on https://github.com/elastic/rally/issues/1257 try: registry.register_track_processor(loader.DefaultTrackPreparator()) except TypeError as e: if e == "__init__() missing 1 required positional argument: 'cfg'": pass