pytest_rally/rally.py (124 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import functools import inspect import logging import os import re from string import Template from pytest_rally import process RALLY_HOME = os.getenv("RALLY_HOME", os.path.expanduser("~")) RALLY_CONFIG_DIR = os.path.join(RALLY_HOME, ".rally") CONFIG_NAME = "pytest" def format_cli_opt(opt, val): opt = re.sub("_", "-", opt) if val is True: return f'--{opt}' elif isinstance(val, dict): val = ",".join([f"{k}:{v}" for k, v in val.items()]) return f'--{opt}="{val}"' def command_for_func(func, **kwargs): command = " ".join(re.split("_", func.__name__)) opts = [format_cli_opt(k, v) for k, v in kwargs.items() if v] return f'esrally {command} {" ".join(opts)}' def add_default_options(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): opts = ["track_repository", "track_revision"] kwargs.update({o: getattr(self, o) for o in opts if kwargs.get(o) is None}) return f(self, *args, **kwargs) return wrapper def generate_command_line(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): sig = inspect.signature(f).bind(self, *args, **kwargs) sig.apply_defaults() cmdline = kwargs.get("cmdline") if cmdline is None: kwargs["cmdline"] = command_for_func(f, **sig.kwargs) elif not cmdline.startswith("esrally"): raise AssertionError(f"Command must begin with 'esrally': [{cmdline}]") return f(self, *args, **kwargs) return wrapper class Rally(): def __init__(self, track_repository=None, track_revision=None, config_dir=RALLY_CONFIG_DIR, config_name=CONFIG_NAME, debug=False): self.revision = None self.track_repository= track_repository self.track_revision = track_revision self.config_dir = config_dir self.config_name = config_name self.config_template = os.path.join(os.path.dirname(__file__), "resources", f"{config_name}.ini") self.config_location = os.path.join(config_dir, f"rally-{config_name}.ini") self.debug = debug self.logger = logging.getLogger(__name__) def install_config_file(self): self.logger.info("Writing Rally config to [%s]", self.config_location) with open(self.config_location, "wt", encoding="utf-8") as target: with open(self.config_template, "rt", encoding="utf-8") as src: contents = src.read() target.write(Template(contents).substitute(CONFIG_DIR=self.config_dir, TRACK_REPO=self.track_repository)) def delete_config_file(self): self.logger.info("Removing Rally config from [%s]", self.config_location) os.remove(self.config_location) def set_revision(self): self.revision = process.run_command_with_output(f"esrally --version").rstrip() self.logger.info("Rally revision: [%s]", self.revision) def configure(self): self.set_revision() self.install_config_file() @add_default_options @generate_command_line def list_tracks(self, *, track_repository=None, track_revision=None, configuration_name=CONFIG_NAME, cmdline=None): self.logger.info("Running command: [%s]", cmdline) return process.run_command_with_output(cmdline) @add_default_options @generate_command_line def race(self, *, track=None, challenge=None, track_repository=None, track_revision=None, client_options=None, configuration_name=CONFIG_NAME, elasticsearch_plugins=None, enable_assertions=True, enable_driver_profiling=False, exclude_tasks=None, include_tasks=None, kill_running_processes=True, on_error="abort", pipeline="benchmark-only", plugin_params=None, preserve_install=False, report_file=None, report_format=None, report_numbers_align=None, show_in_report=None, target_hosts="127.0.0.1:19200", telemetry=None, telemetry_params=None, test_mode=True, track_params=None, user_tag=None, cmdline=None): self.logger.info("Running command: [%s]", cmdline) if not self.debug: return process.run_command_with_return_code(cmdline) def all_tracks_and_challenges(self): ret = [] # The first 13 lines are the Rally banner and table formatting # The last 4 are blank or informational raw = self.list_tracks() track_list = raw.split("\n")[12:-5] for track_str in track_list: track_name, *_, challenge_str = track_str.split() challenges = challenge_str.split(",") ret.append((track_name, challenges)) return ret