esrally/track/loader.py (1,366 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import abc import glob import json import logging import os import re import subprocess import sys import tempfile import urllib.error from collections.abc import Generator from typing import Callable, Optional import jinja2 import jinja2.exceptions import jsonschema import tabulate from jinja2 import meta from esrally import PROGRAM_NAME, config, exceptions, paths, time, types, version from esrally.track import params, track from esrally.track.track import Parallel from esrally.utils import ( collections, console, convert, io, modules, net, opts, repo, serverless, ) class TrackSyntaxError(exceptions.InvalidSyntax): """ Raised whenever a syntax problem is encountered when loading the track specification. """ class TrackProcessor(abc.ABC): def on_after_load_track(self, track: track.Track) -> None: """ This method is called by Rally after a track has been loaded. Implementations are expected to modify the provided track object in place. :param track: The current track. """ @staticmethod def _noop(): """ To minimize complexity here, we use a no-op function to return a no-op result in the base class. Alternatively we could use an ABC with some refactoring. We def the function since lambdas cannot be pickled for Thespian's sake. """ return def on_prepare_track(self, track: track.Track, data_root_dir: str) -> Generator[tuple[Callable, dict], None, None]: """ This method is called by Rally after the "after_load_track" phase. Here, any data that is necessary for benchmark execution should be prepared, e.g. by downloading data or generating it. Implementations should be aware that this method might be called on a different machine than "on_after_load_track" and they cannot share any state in between phases. :param track: The current track. This parameter should be treated as effectively immutable. Any modifications will not be reflected in subsequent phases of the benchmark. :param data_root_dir: The data root directory on the current machine as configured by the user. :return: a Generator[Tuple[Callable, dict], None, None] of function/parameter pairs to be executed by the prepare track's executor actors. """ yield TrackProcessor._noop, {} class TrackProcessorRegistry: def __init__(self, cfg: types.Config): self.required_processors = [TaskFilterTrackProcessor(cfg), ServerlessFilterTrackProcessor(cfg), TestModeTrackProcessor(cfg)] self.track_processors = [] self.offline = cfg.opts("system", "offline.mode") self.test_mode = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False) self.base_config = cfg self.custom_configuration = False def register_track_processor(self, processor): if not self.custom_configuration: # given processor should become the only element self.track_processors = [] if not isinstance(processor, DefaultTrackPreparator): # stop resetting self.track_processors self.custom_configuration = True if hasattr(processor, "cfg"): processor.cfg = self.base_config if hasattr(processor, "downloader"): processor.downloader = Downloader(self.offline, self.test_mode) if hasattr(processor, "decompressor"): processor.decompressor = Decompressor() self.track_processors.append(processor) @property def processors(self): if not self.custom_configuration: self.register_track_processor(DefaultTrackPreparator()) return [*self.required_processors, *self.track_processors] def tracks(cfg: types.Config): """ Lists all known tracks. Note that users can specify a distribution version so if different tracks are available for different versions, this will be reflected in the output. :param cfg: The config object. :return: A list of tracks that are available for the provided distribution version or else for the main version. """ repo = track_repo(cfg) return [_load_single_track(cfg, repo, track_name) for track_name in repo.track_names] def list_tracks(cfg: types.Config): available_tracks = tracks(cfg) only_auto_generated_challenges = all(t.default_challenge.auto_generated for t in available_tracks) data = [] for t in available_tracks: line = [ t.name, t.description, convert.number_to_human_string(t.number_of_documents), convert.bytes_to_human_string(t.compressed_size_in_bytes), convert.bytes_to_human_string(t.uncompressed_size_in_bytes), ] if not only_auto_generated_challenges: line.append(t.default_challenge) line.append(",".join(map(str, t.challenges))) data.append(line) headers = ["Name", "Description", "Documents", "Compressed Size", "Uncompressed Size"] if not only_auto_generated_challenges: headers.append("Default Challenge") headers.append("All Challenges") console.println("Available tracks:\n") console.println(tabulate.tabulate(tabular_data=data, headers=headers)) def track_info(cfg: types.Config): def format_task(t, indent="", num="", suffix=""): msg = f"{indent}{num}{str(t)}" if t.clients > 1: msg += f" ({t.clients} clients)" msg += suffix return msg def challenge_info(c): if not c.auto_generated: msg = f"Challenge [{c.name}]" if c.default: msg += " (run by default)" console.println(msg, underline="=", overline="=") if c.description: console.println(f"\n{c.description}") console.println("\nSchedule:", underline="-") console.println("") for num, task in enumerate(c.schedule, start=1): if task.nested: console.println(format_task(task, suffix=":", num=f"{num}. ")) for leaf_num, leaf_task in enumerate(task, start=1): console.println(format_task(leaf_task, indent="\t", num=f"{num}.{leaf_num} ")) else: console.println(format_task(task, num=f"{num}. ")) t = load_track(cfg) console.println(f"Showing details for track [{t.name}]:\n") console.println(f"* Description: {t.description}") if t.number_of_documents: console.println(f"* Documents: {convert.number_to_human_string(t.number_of_documents)}") console.println(f"* Compressed Size: {convert.bytes_to_human_string(t.compressed_size_in_bytes)}") console.println(f"* Uncompressed Size: {convert.bytes_to_human_string(t.uncompressed_size_in_bytes)}") console.println("") if t.selected_challenge: challenge_info(t.selected_challenge) else: for challenge in t.challenges: challenge_info(challenge) console.println("") def load_track(cfg: types.Config, install_dependencies=False): """ Loads a track :param cfg: The config object. It contains the name of the track to load. :return: The loaded track. """ repo = track_repo(cfg) return _load_single_track(cfg, repo, repo.track_name, install_dependencies) def _install_dependencies(dependencies): if dependencies: log_path = os.path.join(paths.logs(), "dependency.log") console.info(f"Installing track dependencies [{', '.join(dependencies)}]") try: with open(log_path, "ab") as install_log: subprocess.check_call( [sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()], stdout=install_log, stderr=install_log, ) except subprocess.CalledProcessError: raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.") def _load_single_track(cfg: types.Config, track_repository, track_name, install_dependencies=False): try: track_dir = track_repository.track_dir(track_name) reader = TrackFileReader(cfg) current_track = reader.read(track_name, track_repository.track_file(track_name), track_dir) tpr = TrackProcessorRegistry(cfg) if install_dependencies: _install_dependencies(current_track.dependencies) has_plugins = load_track_plugins(cfg, track_name, register_track_processor=tpr.register_track_processor) current_track.has_plugins = has_plugins for processor in tpr.processors: processor.on_after_load_track(current_track) return current_track except FileNotFoundError as e: logging.getLogger(__name__).exception("Cannot load track [%s]", track_name) raise exceptions.SystemSetupError( f"Cannot load track [{track_name}]. List the available tracks with [{PROGRAM_NAME} list tracks]." ) from e except BaseException: logging.getLogger(__name__).exception("Cannot load track [%s]", track_name) raise def load_track_plugins( cfg: types.Config, track_name, register_runner=None, register_scheduler=None, register_track_processor=None, force_update=False, ): """ Loads plugins that are defined for the current track (as specified by the configuration). :param cfg: The config object. :param track_name: Name of the track for which plugins should be loaded. :param register_runner: An optional function where runners can be registered. :param register_scheduler: An optional function where custom schedulers can be registered. :param register_track_processor: An optional function where track processors can be registered. :param force_update: If set to ``True`` this ensures that the track is first updated from the remote repository. Defaults to ``False``. :return: True iff this track defines plugins and they have been loaded. """ repo = track_repo(cfg, fetch=force_update, update=force_update) track_plugin_path = repo.track_dir(track_name) logging.getLogger(__name__).debug("Invoking plugin_reader with name [%s] resolved to path [%s]", track_name, track_plugin_path) plugin_reader = TrackPluginReader(track_plugin_path, register_runner, register_scheduler, register_track_processor) if plugin_reader.can_load(): plugin_reader.load() return True else: return False def set_absolute_data_path(cfg: types.Config, t): """ Sets an absolute data path on all document files in this track. Internally we store only relative paths in the track as long as possible as the data root directory may be different on each host. In the end we need to have an absolute path though when we want to read the file on the target host. :param cfg: The config object. :param t: The track to modify. """ def first_existing(root_dirs, f): for root_dir in root_dirs: p = os.path.join(root_dir, f) if os.path.exists(p): return p return None for corpus in t.corpora: data_root = data_dir(cfg, t.name, corpus.name) for document_set in corpus.documents: # At this point we can assume that the file is available locally. Check which path exists and set it. if document_set.document_archive: document_set.document_archive = first_existing(data_root, document_set.document_archive) if document_set.document_file: document_set.document_file = first_existing(data_root, document_set.document_file) def is_simple_track_mode(cfg: types.Config): return cfg.exists("track", "track.path") def track_path(cfg: types.Config): repo = track_repo(cfg) track_name = repo.track_name track_dir = repo.track_dir(track_name) return track_dir def track_repo(cfg: types.Config, fetch=True, update=True): if is_simple_track_mode(cfg): track_path = cfg.opts("track", "track.path") return SimpleTrackRepository(track_path) else: return GitTrackRepository(cfg, fetch, update) def data_dir(cfg: types.Config, track_name, corpus_name): """ Determines potential data directories for the provided track and corpus name. :param cfg: The config object. :param track_name: Name of the current track. :param corpus_name: Name of the current corpus. :return: A list containing either one or two elements. Each element contains a path to a directory which may contain document files. """ corpus_dir = os.path.join(cfg.opts("benchmarks", "local.dataset.cache"), corpus_name) if is_simple_track_mode(cfg): track_path = cfg.opts("track", "track.path") r = SimpleTrackRepository(track_path) # data should always be stored in the track's directory. If the user uses the same directory on all machines this will even work # in the distributed case. However, the user is responsible to ensure that this is actually the case. return [r.track_dir(track_name), corpus_dir] else: return [corpus_dir] class GitTrackRepository: def __init__(self, cfg: types.Config, fetch, update, repo_class=repo.RallyRepository): # current track name (if any) self.track_name = cfg.opts("track", "track.name", mandatory=False) distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False) repo_name = cfg.opts("track", "repository.name") repo_revision = cfg.opts("track", "repository.revision", mandatory=False) offline = cfg.opts("system", "offline.mode") # TODO remove the below ignore when introducing LiteralString on Python 3.11+ remote_url = cfg.opts("tracks", "%s.url" % repo_name, mandatory=False) # type: ignore[arg-type] root = cfg.opts("node", "root.dir") track_repositories = cfg.opts("benchmarks", "track.repository.dir") tracks_dir = os.path.join(root, track_repositories) self.repo = repo_class(remote_url, tracks_dir, repo_name, "tracks", offline, fetch) if update: if repo_revision: # skip checkout if already on correct version. this is helpful in case of multiple actors loading simultaneously. if not self.repo.correct_revision(repo_revision): self.repo.checkout(repo_revision) else: self.repo.update(distribution_version) cfg.add(config.Scope.applicationOverride, "track", "repository.revision", self.repo.revision) @property def track_names(self): retval = [] # + 1 to capture trailing slash fully_qualified_path_length = len(self.repo.repo_dir) + 1 for root_dir, _, files in os.walk(self.repo.repo_dir): if "track.json" in files: retval.append(root_dir[fully_qualified_path_length:]) return retval def track_dir(self, track_name): return os.path.join(self.repo.repo_dir, track_name) def track_file(self, track_name): return os.path.join(self.track_dir(track_name), "track.json") class SimpleTrackRepository: def __init__(self, track_path): if not os.path.exists(track_path): raise exceptions.SystemSetupError("Track path %s does not exist" % track_path) if os.path.isdir(track_path): self.track_name = io.basename(track_path) self._track_dir = track_path self._track_file = os.path.join(track_path, "track.json") if not os.path.exists(self._track_file): raise exceptions.SystemSetupError("Could not find track.json in %s" % track_path) elif os.path.isfile(track_path): if io.has_extension(track_path, ".json"): self._track_dir = io.dirname(track_path) self._track_file = track_path self.track_name = io.splitext(io.basename(track_path))[0] else: raise exceptions.SystemSetupError("%s has to be a JSON file" % track_path) else: raise exceptions.SystemSetupError("%s is neither a file nor a directory" % track_path) @property def track_names(self): return [self.track_name] def track_dir(self, track_name): assert track_name == self.track_name, "Expect provided track name [%s] to match [%s]" % (track_name, self.track_name) return self._track_dir def track_file(self, track_name): assert track_name == self.track_name return self._track_file def operation_parameters(t, task): op = task.operation if op.param_source: return params.param_source_for_name(op.param_source, t, op.params) else: return params.param_source_for_operation(op.type, t, op.params, task.name) def used_corpora(t): corpora = {} if t.corpora: challenge = t.selected_challenge_or_default for task in challenge.schedule: for sub_task in task: param_source = operation_parameters(t, sub_task) if hasattr(param_source, "corpora"): for c in param_source.corpora: # We might have the same corpus *but* they contain different doc sets. Therefore also need to union over doc sets. corpora[c.name] = corpora.get(c.name, c).union(c) return corpora.values() class DefaultTrackPreparator(TrackProcessor): def __init__(self): super().__init__() # just declare here, will be injected later self.cfg: Optional[types.Config] = None self.downloader = None self.decompressor = None self.track = None @staticmethod def prepare_docs(cfg: types.Config, track, corpus, preparator): for document_set in corpus.documents: if document_set.is_bulk: data_root = data_dir(cfg, track.name, corpus.name) logging.getLogger(__name__).info( "Resolved data root directory for document corpus [%s] in track [%s] to [%s].", corpus.name, track.name, data_root ) if len(data_root) == 1: preparator.prepare_document_set(document_set, data_root[0]) # attempt to prepare everything in the current directory and fallback to the corpus directory elif not preparator.prepare_bundled_document_set(document_set, data_root[0]): preparator.prepare_document_set(document_set, data_root[1]) def on_prepare_track(self, track, data_root_dir) -> Generator[tuple[Callable, dict], None, None]: prep = DocumentSetPreparator(track.name, self.downloader, self.decompressor) for corpus in used_corpora(track): params = {"cfg": self.cfg, "track": track, "corpus": corpus, "preparator": prep} yield DefaultTrackPreparator.prepare_docs, params class Decompressor: def __init__(self): self.logger = logging.getLogger(__name__) def decompress(self, archive_path, documents_path, uncompressed_size): if uncompressed_size: msg = ( f"Decompressing track data from [{archive_path}] to [{documents_path}] (resulting size: " f"[{convert.bytes_to_gb(uncompressed_size):.2f}] GB) ... " ) else: msg = f"Decompressing track data from [{archive_path}] to [{documents_path}] ... " console.info(msg, end="", flush=True, logger=self.logger) io.decompress(archive_path, io.dirname(archive_path)) console.println("[OK]") if not os.path.isfile(documents_path): raise exceptions.DataError( f"Decompressing [{archive_path}] did not create [{documents_path}]. Please check with the track " f"author if the compressed archive has been created correctly." ) extracted_bytes = os.path.getsize(documents_path) if uncompressed_size is not None and extracted_bytes != uncompressed_size: raise exceptions.DataError( f"[{documents_path}] is corrupt. Extracted [{extracted_bytes}] bytes but [{uncompressed_size}] bytes are expected." ) class Downloader: def __init__(self, offline, test_mode): self.offline = offline self.test_mode = test_mode self.logger = logging.getLogger(__name__) def download(self, base_url, target_path, size_in_bytes): file_name = os.path.basename(target_path) if not base_url: raise exceptions.DataError("Cannot download data because no base URL is provided.") if self.offline: raise exceptions.SystemSetupError(f"Cannot find [{target_path}]. Please disable offline mode and retry.") if base_url.endswith("/"): separator = "" else: separator = "/" # join manually as `urllib.parse.urljoin` does not work with S3 or GS URL schemes. data_url = f"{base_url}{separator}{file_name}" try: io.ensure_dir(os.path.dirname(target_path)) if size_in_bytes: size_in_mb = round(convert.bytes_to_mb(size_in_bytes)) self.logger.info("Downloading data from [%s] (%s MB) to [%s].", data_url, size_in_mb, target_path) else: self.logger.info("Downloading data from [%s] to [%s].", data_url, target_path) # we want to have a bit more accurate download progress as these files are typically very large progress = net.Progress("[INFO] Downloading track data", accuracy=1) net.download(data_url, target_path, size_in_bytes, progress_indicator=progress) progress.finish() self.logger.info("Downloaded data from [%s] to [%s].", data_url, target_path) except urllib.error.HTTPError as e: if e.code == 404 and self.test_mode: raise exceptions.DataError( "This track does not support test mode. Ask the track author to add it or disable test mode and retry." ) from None msg = f"Could not download [{data_url}] to [{target_path}]" if e.reason: msg += f" (HTTP status: {e.code}, reason: {e.reason})" else: msg += f" (HTTP status: {e.code})" raise exceptions.DataError(msg) from e except urllib.error.URLError as e: raise exceptions.DataError(f"Could not download [{data_url}] to [{target_path}].") from e if not os.path.isfile(target_path): raise exceptions.SystemSetupError( f"Could not download [{data_url}] to [{target_path}]. Verify data " f"are available at [{data_url}] and check your Internet connection." ) actual_size = os.path.getsize(target_path) if size_in_bytes is not None and actual_size != size_in_bytes: raise exceptions.DataError( f"[{target_path}] is corrupt. Downloaded [{actual_size}] bytes but [{size_in_bytes}] bytes are expected." ) class DocumentSetPreparator: def __init__(self, track_name, downloader, decompressor): self.track_name = track_name self.downloader = downloader self.decompressor = decompressor def is_locally_available(self, file_name): return os.path.isfile(file_name) def has_expected_size(self, file_name, expected_size): return expected_size is None or os.path.getsize(file_name) == expected_size def create_file_offset_table(self, document_file_path, expected_number_of_lines): # just rebuild the file every time for the time being. Later on, we might check the data file fingerprint to avoid it lines_read = io.prepare_file_offset_table(document_file_path) if lines_read and lines_read != expected_number_of_lines: io.remove_file_offset_table(document_file_path) raise exceptions.DataError( f"Data in [{document_file_path}] for track [{self.track_name}] are invalid. " f"Expected [{expected_number_of_lines}] lines but got [{lines_read}]." ) def prepare_document_set(self, document_set, data_root): """ Prepares a document set locally. Precondition: The document set contains either a compressed or an uncompressed document file reference. Postcondition: Either following files will be present locally: * The compressed document file (if specified originally in the corpus) * The uncompressed document file * A file offset table based on the document file Or this method will raise an appropriate Exception (download error, inappropriate specification of files, ...). :param document_set: A document set. :param data_root: The data root directory for this document set. """ doc_path = os.path.join(data_root, document_set.document_file) archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None while True: if self.is_locally_available(doc_path) and self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): break if ( document_set.has_compressed_corpus() and self.is_locally_available(archive_path) and self.has_expected_size(archive_path, document_set.compressed_size_in_bytes) ): self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) else: if document_set.has_compressed_corpus(): target_path = archive_path expected_size = document_set.compressed_size_in_bytes elif document_set.has_uncompressed_corpus(): target_path = doc_path expected_size = document_set.uncompressed_size_in_bytes else: # this should not happen in practice as the JSON schema should take care of this raise exceptions.RallyAssertionError(f"Track {self.track_name} specifies documents but no corpus") try: self.downloader.download(document_set.base_url, target_path, expected_size) except exceptions.DataError as e: if e.message == "Cannot download data because no base URL is provided." and self.is_locally_available(target_path): raise exceptions.DataError( f"[{target_path}] is present but does not have the expected " f"size of [{expected_size}] bytes and it cannot be downloaded " f"because no base URL is provided." ) from None raise self.create_file_offset_table(doc_path, document_set.number_of_lines) def prepare_bundled_document_set(self, document_set, data_root): """ Prepares a document set that comes "bundled" with the track, i.e. the data files are in the same directory as the track. This is a "lightweight" version of #prepare_document_set() which assumes that at least one file is already present in the current directory. It will attempt to find the appropriate files, decompress if necessary and create a file offset table. Precondition: The document set contains either a compressed or an uncompressed document file reference. Postcondition: If this method returns ``True``, the following files will be present locally: * The compressed document file (if specified originally in the corpus) * The uncompressed document file * A file offset table based on the document file If this method returns ``False`` either the document size is wrong or any files have not been found. :param document_set: A document set. :param data_root: The data root directory for this document set (should be the same as the track file). :return: See postcondition. """ doc_path = os.path.join(data_root, document_set.document_file) archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None while True: if self.is_locally_available(doc_path): if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): self.create_file_offset_table(doc_path, document_set.number_of_lines) return True else: raise exceptions.DataError( f"[{doc_path}] is present but does not have the expected size " f"of [{document_set.uncompressed_size_in_bytes}] bytes." ) if document_set.has_compressed_corpus() and self.is_locally_available(archive_path): if self.has_expected_size(archive_path, document_set.compressed_size_in_bytes): self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) else: # treat this is an error because if the file is present but the size does not match, something is # really fishy. It is likely that the user is currently creating a new track and did not specify # the file size correctly. raise exceptions.DataError( f"[{archive_path}] is present but does not have " f"the expected size of [{document_set.compressed_size_in_bytes}] bytes." ) else: return False class TemplateSource: """ Prepares the fully assembled track file from file or string. Doesn't render using jinja2, but embeds track fragments referenced with rally.collect(parts=... """ collect_parts_re = re.compile(r"{{\ +?rally\.collect\(parts=\"(.+?(?=\"))\"\)\ +?}}") def __init__(self, base_path, template_file_name, source=io.FileSource, fileglobber=glob.glob): self.base_path = base_path self.template_file_name = template_file_name self.source = source self.fileglobber = fileglobber self.assembled_source = None self.logger = logging.getLogger(__name__) def load_template_from_file(self): loader = jinja2.FileSystemLoader(self.base_path) try: base_track = loader.get_source(jinja2.Environment(), self.template_file_name) except jinja2.TemplateNotFound: self.logger.exception("Could not load track from [%s].", self.template_file_name) raise TrackSyntaxError(f"Could not load track from '{self.template_file_name}'") self.assembled_source = self.replace_includes(self.base_path, base_track[0]) def load_template_from_string(self, template_source): self.assembled_source = self.replace_includes(self.base_path, template_source) def replace_includes(self, base_path, track_fragment): match = TemplateSource.collect_parts_re.findall(track_fragment) if match: # Construct replacement dict for matched captures repl = {} for glob_pattern in match: full_glob_path = os.path.join(base_path, glob_pattern) sub_source = self.read_glob_files(full_glob_path) repl[glob_pattern] = self.replace_includes(base_path=io.dirname(full_glob_path), track_fragment=sub_source) def replstring(matchobj): # matchobj.groups() is a tuple and first element contains the matched group id return repl[matchobj.groups()[0]] return TemplateSource.collect_parts_re.sub(replstring, track_fragment) return track_fragment def read_glob_files(self, pattern): source = [] files = self.fileglobber(pattern) for fname in files: with self.source(fname, mode="rt", encoding="utf-8") as fp: source.append(fp.read()) return ",\n".join(source) def default_internal_template_vars(glob_helper=lambda f: [], clock=time.Clock, build_flavor=None, serverless_operator=None): """ Dict of internal global variables used by our jinja2 renderers """ return { "globals": { "build_flavor": build_flavor, "serverless_operator": serverless_operator, "now": clock.now(), "glob": glob_helper, }, "filters": { "days_ago": time.days_ago, "get_start_date": time.get_start_date, "get_end_date": time.get_end_date, }, } def render_template(template_source, template_vars=None, template_internal_vars=None, loader=None): macros = [ """ {% macro collect(parts) -%} {% set comma = joiner() %} {% for part in glob(parts) %} {{ comma() }} {% include part %} {% endfor %} {%- endmacro %} """, """ {% macro exists_set_param(setting_name, value, default_value=None, comma=True) -%} {% if value is defined or default_value is not none %} {% if comma %} , {% endif %} {% if default_value is not none %} "{{ setting_name }}": {{ value | default(default_value) | tojson }} {% else %} "{{ setting_name }}": {{ value | tojson }} {% endif %} {% endif %} {%- endmacro %} """, ] # place helpers dict loader first to prevent users from overriding our macros. env = jinja2.Environment( loader=jinja2.ChoiceLoader([jinja2.DictLoader({"rally.helpers": "".join(macros)}), jinja2.BaseLoader(), loader]) ) if template_vars: for k, v in template_vars.items(): env.globals[k] = v # ensure that user variables never override our internal variables if template_internal_vars: for macro_type in template_internal_vars: for env_global_key, env_global_value in template_internal_vars[macro_type].items(): getattr(env, macro_type)[env_global_key] = env_global_value template = env.from_string(template_source) return template.render() def register_all_params_in_track(assembled_source, complete_track_params=None): j2env = jinja2.Environment() # we don't need the following j2 filters/macros but we define them anyway to prevent parsing failures internal_template_vars = default_internal_template_vars() for macro_type in internal_template_vars: for env_global_key, env_global_value in internal_template_vars[macro_type].items(): getattr(j2env, macro_type)[env_global_key] = env_global_value ast = j2env.parse(assembled_source) j2_variables = meta.find_undeclared_variables(ast) if complete_track_params: complete_track_params.populate_track_defined_params(j2_variables) def render_template_from_file(template_file_name, template_vars, complete_track_params=None, build_flavor=None, serverless_operator=None): def relative_glob(start, f): result = glob.glob(os.path.join(start, f)) if result: return [os.path.relpath(p, start) for p in result] else: return [] base_path = io.dirname(template_file_name) template_source = TemplateSource(base_path, io.basename(template_file_name)) template_source.load_template_from_file() register_all_params_in_track(template_source.assembled_source, complete_track_params) return render_template( loader=jinja2.FileSystemLoader(base_path), template_source=template_source.assembled_source, template_vars=template_vars, template_internal_vars=default_internal_template_vars( glob_helper=lambda f: relative_glob(base_path, f), build_flavor=build_flavor, serverless_operator=serverless_operator ), ) class TaskFilterTrackProcessor(TrackProcessor): def __init__(self, cfg: types.Config): self.logger = logging.getLogger(__name__) include_tasks = cfg.opts("track", "include.tasks", mandatory=False) exclude_tasks = cfg.opts("track", "exclude.tasks", mandatory=False) if include_tasks: filtered_tasks = include_tasks self.exclude = False else: filtered_tasks = exclude_tasks self.exclude = True self.filters = self._filters_from_filtered_tasks(filtered_tasks) def _filters_from_filtered_tasks(self, filtered_tasks): filters = [] if filtered_tasks: for t in filtered_tasks: spec = t.split(":") if len(spec) == 1: filters.append(track.TaskNameFilter(spec[0])) elif len(spec) == 2: if spec[0] == "type": # TODO remove the below ignore when introducing type hints filters.append(track.TaskOpTypeFilter(spec[1])) # type: ignore[arg-type] elif spec[0] == "tag": # TODO remove the below ignore when introducing type hints filters.append(track.TaskTagFilter(spec[1])) # type: ignore[arg-type] else: raise exceptions.SystemSetupError(f"Invalid format for filtered tasks: [{t}]. Expected [type] but got [{spec[0]}].") else: raise exceptions.SystemSetupError(f"Invalid format for filtered tasks: [{t}]") return filters def _filter_out_match(self, task): for f in self.filters: if task.matches(f): if hasattr(task, "tasks") and self.exclude: return False return self.exclude return not self.exclude def on_after_load_track(self, track): if not self.filters: return track for challenge in track.challenges: # don't modify the schedule while iterating over it tasks_to_remove = [] for task in challenge.schedule: if self._filter_out_match(task): tasks_to_remove.append(task) else: leafs_to_remove = [] for leaf_task in task: if self._filter_out_match(leaf_task): leafs_to_remove.append(leaf_task) for leaf_task in leafs_to_remove: self.logger.info("Removing sub-task [%s] from challenge [%s] due to task filter.", leaf_task, challenge) task.remove_task(leaf_task) for task in tasks_to_remove: self.logger.info("Removing task [%s] from challenge [%s] due to task filter.", task, challenge) challenge.remove_task(task) return track class ServerlessFilterTrackProcessor(TrackProcessor): def __init__(self, cfg: types.Config): self.logger = logging.getLogger(__name__) self.serverless_mode = convert.to_bool(cfg.opts("driver", "serverless.mode", mandatory=False, default_value=False)) self.serverless_operator = convert.to_bool(cfg.opts("driver", "serverless.operator", mandatory=False, default_value=False)) def _is_filtered_task(self, operation): if operation.run_on_serverless is not None: return not operation.run_on_serverless if operation.type == "raw-request": self.logger.info("Treating raw-request operation for operation [%s] as public.", operation.name) try: op = track.OperationType.from_hyphenated_string(operation.type) # Comparisons rely on the ordering in serverless.Status which is an IntEnum if self.serverless_operator: return op.serverless_status < serverless.Status.Internal else: return op.serverless_status < serverless.Status.Public except KeyError: self.logger.info("Treating user-provided operation type [%s] for operation [%s] as public.", operation.type, operation.name) return False def on_after_load_track(self, track): if not self.serverless_mode: return track for challenge in track.challenges: # don't modify the schedule while iterating over it tasks_to_remove = [] for task in challenge.schedule: if isinstance(task, Parallel): challenge.serverless_info.append(f"Treating parallel task in challenge [{challenge}] as public.") elif self._is_filtered_task(task.operation): tasks_to_remove.append(task) for task in tasks_to_remove: challenge.remove_task(task) if tasks_to_remove: task_str = ", ".join(f"[{task}]" for task in tasks_to_remove) challenge.serverless_info.append(f"Excluding {task_str} as challenge [{challenge}] is run on serverless.") return track class TestModeTrackProcessor(TrackProcessor): def __init__(self, cfg: types.Config): self.test_mode_enabled = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False) self.logger = logging.getLogger(__name__) def on_after_load_track(self, track): if not self.test_mode_enabled: return track self.logger.info("Preparing track [%s] for test mode.", str(track)) for corpus in track.corpora: for document_set in corpus.documents: # TODO #341: Should we allow this for snapshots too? if document_set.is_bulk: if document_set.number_of_documents > 1000: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "Reducing corpus size to 1000 documents in corpus [%s], uncompressed source file [%s]", corpus.name, document_set.document_file, ) document_set.number_of_documents = 1000 if document_set.has_compressed_corpus(): path, ext = io.splitext(document_set.document_archive) path_2, ext_2 = io.splitext(path) document_set.document_archive = f"{path_2}-1k{ext_2}{ext}" document_set.document_file = f"{path_2}-1k{ext_2}" elif document_set.has_uncompressed_corpus(): path, ext = io.splitext(document_set.document_file) document_set.document_file = f"{path}-1k{ext}" else: raise exceptions.RallyAssertionError( f"Document corpus [{corpus.name}] has neither compressed nor uncompressed corpus." ) # we don't want to check sizes document_set.compressed_size_in_bytes = None document_set.uncompressed_size_in_bytes = None else: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "Maintaining existing size of %d documents in corpus [%s], uncompressed source file [%s]", document_set.number_of_documents, corpus.name, document_set.document_file, ) for challenge in track.challenges: for task in challenge.schedule: # we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements for leaf_task in task: # iteration-based schedules are divided among all clients and we should provide # at least one iteration for each client. if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients: count = leaf_task.clients if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Resetting warmup iterations to %d for [%s]", count, str(leaf_task)) leaf_task.warmup_iterations = count if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients: count = leaf_task.clients if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Resetting measurement iterations to %d for [%s]", count, str(leaf_task)) leaf_task.iterations = count if leaf_task.warmup_time_period is not None and leaf_task.warmup_time_period > 0: leaf_task.warmup_time_period = 0 if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "Resetting warmup time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.warmup_time_period ) if leaf_task.time_period is not None and leaf_task.time_period > 10: leaf_task.time_period = 10 if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "Resetting measurement time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.time_period ) # Keep throttled to expose any errors but increase the target throughput for short execution times. if leaf_task.target_throughput: original_throughput = leaf_task.target_throughput leaf_task.params.pop("target-throughput", None) leaf_task.params.pop("target-interval", None) leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}" return track class CompleteTrackParams: def __init__(self, user_specified_track_params=None): self.track_defined_params = set() self.user_specified_track_params = user_specified_track_params if user_specified_track_params else {} def internal_user_defined_track_params(self): set_user_params = set(list(self.user_specified_track_params.keys())) set_internal_params = set(default_internal_template_vars()["globals"].keys()) return list(set_user_params & set_internal_params) def populate_track_defined_params(self, list_of_track_params=None): self.track_defined_params.update(set(list_of_track_params)) @property def sorted_track_defined_params(self): return sorted(self.track_defined_params) def unused_user_defined_track_params(self): set_user_params = set(list(self.user_specified_track_params.keys())) set_user_params.difference_update(self.track_defined_params) return list(set_user_params) class TrackFileReader: MINIMUM_SUPPORTED_TRACK_VERSION = 2 MAXIMUM_SUPPORTED_TRACK_VERSION = 2 """ Creates a track from a track file. """ def __init__(self, cfg: types.Config): track_schema_file = os.path.join(cfg.opts("node", "rally.root"), "resources", "track-schema.json") with open(track_schema_file, encoding="utf-8") as f: self.track_schema = json.loads(f.read()) self.build_flavor = cfg.opts("mechanic", "distribution.flavor", default_value="default", mandatory=False) self.serverless_operator = cfg.opts("driver", "serverless.operator", default_value=False, mandatory=False) self.track_params = cfg.opts("track", "params", mandatory=False) self.complete_track_params = CompleteTrackParams(user_specified_track_params=self.track_params) self.read_track = TrackSpecificationReader( track_params=self.track_params, complete_track_params=self.complete_track_params, selected_challenge=cfg.opts("track", "challenge.name", mandatory=False), build_flavor=self.build_flavor, serverless_operator=self.serverless_operator, ) self.logger = logging.getLogger(__name__) def read(self, track_name, track_spec_file, mapping_dir): """ Reads a track file, verifies it against the JSON schema and if valid, creates a track. :param track_name: The name of the track. :param track_spec_file: The complete path to the track specification file. :param mapping_dir: The directory where the mapping files for this track are stored locally. :return: A corresponding track instance if the track file is valid. """ self.logger.info("Reading track specification file [%s].", track_spec_file) # render the track to a temporary file instead of dumping it into the logs. It is easier to check for error messages # involving lines numbers and it also does not bloat Rally's log file so much. with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp: try: rendered = render_template_from_file( track_spec_file, self.track_params, complete_track_params=self.complete_track_params, build_flavor=self.build_flavor, serverless_operator=self.serverless_operator, ) with open(tmp.name, "w", encoding="utf-8") as f: f.write(rendered) self.logger.info("Final rendered track for '%s' has been written to '%s'.", track_spec_file, tmp.name) track_spec = json.loads(rendered) except jinja2.exceptions.TemplateSyntaxError as te: self.logger.exception("Could not load [%s] due to Jinja Syntax Exception.", track_spec_file) msg = f"Could not load '{track_spec_file}' due to Jinja Syntax Exception. " msg += f"The track file ({tmp.name}) likely hasn't been written." raise TrackSyntaxError(msg, te) except jinja2.exceptions.TemplateNotFound: self.logger.exception("Could not load [%s]", track_spec_file) raise exceptions.SystemSetupError(f"Track {track_name} does not exist") except json.JSONDecodeError as e: self.logger.exception("Could not load [%s].", track_spec_file) msg = f"Could not load '{track_spec_file}': {str(e)}." if e.doc and e.lineno > 0 and e.colno > 0: line_idx = e.lineno - 1 lines = e.doc.split("\n") ctx_line_count = 3 ctx_start = max(0, line_idx - ctx_line_count) ctx_end = min(line_idx + ctx_line_count, len(lines)) erroneous_lines = lines[ctx_start:ctx_end] erroneous_lines.insert(line_idx - ctx_start + 1, "-" * (e.colno - 1) + "^ Error is here") msg += " Lines containing the error:\n\n{}\n\n".format("\n".join(erroneous_lines)) msg += f"The complete track has been written to '{tmp.name}' for diagnosis." raise TrackSyntaxError(msg) except Exception as e: self.logger.exception("Could not load [%s].", track_spec_file) msg = f"Could not load '{track_spec_file}'. The complete track has been written to '{tmp.name}' for diagnosis." raise TrackSyntaxError(msg, e) # check the track version before even attempting to validate the JSON format to avoid bogus errors. raw_version = track_spec.get("version", TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION) try: track_version = int(raw_version) except ValueError: raise exceptions.InvalidSyntax("version identifier for track %s must be numeric but was [%s]" % (track_name, str(raw_version))) if TrackFileReader.MINIMUM_SUPPORTED_TRACK_VERSION > track_version: raise exceptions.RallyError( "Track {} is on version {} but needs to be updated at least to version {} to work with the " "current version of Rally.".format(track_name, track_version, TrackFileReader.MINIMUM_SUPPORTED_TRACK_VERSION) ) if TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION < track_version: raise exceptions.RallyError( "Track {} requires a newer version of Rally. Please upgrade Rally (supported track version: {}, " "required track version: {}).".format(track_name, TrackFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION, track_version) ) try: jsonschema.validate(track_spec, self.track_schema) except jsonschema.exceptions.ValidationError as ve: raise TrackSyntaxError( "Track '{}' is invalid.\n\nError details: {}\nInstance: {}\nPath: {}\nSchema path: {}".format( track_name, ve.message, json.dumps(ve.instance, indent=4, sort_keys=True), ve.absolute_path, ve.absolute_schema_path ) ) current_track = self.read_track(track_name, track_spec, mapping_dir, track_spec_file) internal_user_defined_track_params = self.complete_track_params.internal_user_defined_track_params() if len(internal_user_defined_track_params) > 0: params_list = ",".join(opts.double_quoted_list_of(sorted(internal_user_defined_track_params))) err_msg = f"Some of your track parameter(s) {params_list} are defined by Rally and cannot be modified.\n" self.logger.critical(err_msg) # also dump the message on the console console.println(err_msg) raise exceptions.TrackConfigError(f"Reserved track parameters {sorted(internal_user_defined_track_params)}.") unused_user_defined_track_params = self.complete_track_params.unused_user_defined_track_params() if len(unused_user_defined_track_params) > 0: err_msg = ( "Some of your track parameter(s) {} are not used by this track; perhaps you intend to use {} instead.\n\n" "All track parameters you provided are:\n" "{}\n\n" "All parameters exposed by this track:\n" "{}".format( ",".join(opts.double_quoted_list_of(sorted(unused_user_defined_track_params))), ",".join( opts.double_quoted_list_of( sorted( opts.make_list_of_close_matches( unused_user_defined_track_params, self.complete_track_params.track_defined_params ) ) ) ), "\n".join(opts.bulleted_list_of(sorted(list(self.track_params.keys())))), "\n".join(opts.bulleted_list_of(self.complete_track_params.sorted_track_defined_params)), ) ) self.logger.critical(err_msg) # also dump the message on the console console.println(err_msg) raise exceptions.TrackConfigError(f"Unused track parameters {sorted(unused_user_defined_track_params)}.") return current_track class TrackPluginReader: """ Loads track plugins """ def __init__(self, track_plugin_path, runner_registry=None, scheduler_registry=None, track_processor_registry=None): self.runner_registry = runner_registry self.scheduler_registry = scheduler_registry self.track_processor_registry = track_processor_registry self.loader = modules.ComponentLoader(root_path=track_plugin_path, component_entry_point="track") def can_load(self): return self.loader.can_load() def load(self): # get dependent libraries installed in a prior step. ensure dir exists to make sure loading works correctly. os.makedirs(paths.libs(), exist_ok=True) sys.path.insert(0, paths.libs()) root_modules = self.loader.load() try: # every module needs to have a register() method for module in root_modules: module.register(self) except BaseException: msg = "Could not register track plugin at [%s]" % self.loader.root_path logging.getLogger(__name__).exception(msg) raise exceptions.SystemSetupError(msg) def register_param_source(self, name, param_source): params.register_param_source_for_name(name, param_source) def register_runner(self, name, runner, **kwargs): if self.runner_registry: self.runner_registry(name, runner, **kwargs) def register_scheduler(self, name, scheduler): if self.scheduler_registry: self.scheduler_registry(name, scheduler) def register_track_processor(self, track_processor): if self.track_processor_registry: self.track_processor_registry(track_processor) @property def meta_data(self): return { "rally_version": version.release_version(), "async_runner": True, } class TrackSpecificationReader: """ Creates a track instances based on its parsed JSON description. """ def __init__( self, track_params=None, complete_track_params=None, selected_challenge=None, source=io.FileSource, build_flavor=None, serverless_operator=None, ): self.name = None self.base_path = None self.build_flavor = build_flavor self.serverless_operator = serverless_operator self.track_params = track_params if track_params else {} self.complete_track_params = complete_track_params self.selected_challenge = selected_challenge self.source = source self.logger = logging.getLogger(__name__) def __call__(self, track_name, track_specification, mapping_dir, spec_file=None): self.name = track_name self.base_path = os.path.dirname(os.path.abspath(spec_file)) if spec_file else None description = self._r(track_specification, "description", mandatory=False, default_value="") meta_data = self._r(track_specification, "meta", mandatory=False) indices = [ self._create_index(idx, mapping_dir) for idx in self._r(track_specification, "indices", mandatory=False, default_value=[]) ] data_streams = [ self._create_data_stream(idx) for idx in self._r(track_specification, "data-streams", mandatory=False, default_value=[]) ] if len(indices) > 0 and len(data_streams) > 0: # we guard against this early and support either or raise TrackSyntaxError("indices and data-streams cannot both be specified") templates = [ self._create_index_template(tpl, mapping_dir) for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[]) ] composable_templates = [ self._create_index_template(tpl, mapping_dir) for tpl in self._r(track_specification, "composable-templates", mandatory=False, default_value=[]) ] component_templates = [ self._create_component_template(tpl, mapping_dir) for tpl in self._r(track_specification, "component-templates", mandatory=False, default_value=[]) ] corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices, data_streams) challenges = self._create_challenges(track_specification) dependencies = self._r(track_specification, "dependencies", mandatory=False) # at this point, *all* track params must have been referenced in the templates return track.Track( name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices, data_streams=data_streams, templates=templates, composable_templates=composable_templates, component_templates=component_templates, corpora=corpora, dependencies=dependencies, root=self.base_path, ) def _error(self, msg): raise TrackSyntaxError("Track '%s' is invalid. %s" % (self.name, msg)) def _r(self, root, path, error_ctx=None, mandatory=True, default_value=None): if isinstance(path, str): path = [path] structure = root try: for k in path: structure = structure[k] return structure except KeyError: if mandatory: if error_ctx: self._error("Mandatory element '%s' is missing in '%s'." % (".".join(path), error_ctx)) else: self._error("Mandatory element '%s' is missing." % ".".join(path)) else: return default_value def _create_index(self, index_spec, mapping_dir): index_name = self._r(index_spec, "name") body_file = self._r(index_spec, "body", mandatory=False) if body_file: idx_body_tmpl_src = TemplateSource(mapping_dir, body_file, self.source) with self.source(os.path.join(mapping_dir, body_file), "rt") as f: idx_body_tmpl_src.load_template_from_string(f.read()) body = self._load_template(idx_body_tmpl_src.assembled_source, f"definition for index {index_name} in {body_file}") else: body = None return track.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[])) def _create_data_stream(self, data_stream_spec): return track.DataStream(name=self._r(data_stream_spec, "name")) def _create_component_template(self, tpl_spec, mapping_dir): name = self._r(tpl_spec, "name") template_file = self._r(tpl_spec, "template") template_path = self._r(tpl_spec, "template-path", mandatory=False) template_file = os.path.join(mapping_dir, template_file) idx_tmpl_src = TemplateSource(mapping_dir, template_file, self.source) with self.source(template_file, "rt") as f: idx_tmpl_src.load_template_from_string(f.read()) template_content = self._load_template( idx_tmpl_src.assembled_source, f"definition for component template {name} in {template_file}" ) if template_path: for field in template_path.split("."): template_content = template_content[field] return track.ComponentTemplate(name, template_content) def _create_index_template(self, tpl_spec, mapping_dir): name = self._r(tpl_spec, "name") template_file = self._r(tpl_spec, "template") template_path = self._r(tpl_spec, "template-path", mandatory=False) index_pattern = self._r(tpl_spec, "index-pattern") delete_matching_indices = self._r(tpl_spec, "delete-matching-indices", mandatory=False, default_value=True) template_file = os.path.join(mapping_dir, template_file) idx_tmpl_src = TemplateSource(mapping_dir, template_file, self.source) with self.source(template_file, "rt") as f: idx_tmpl_src.load_template_from_string(f.read()) template_content = self._load_template( idx_tmpl_src.assembled_source, f"definition for index template {name} in {template_file}" ) if template_path: for field in template_path.split("."): template_content = template_content[field] return track.IndexTemplate(name, index_pattern, template_content, delete_matching_indices) def _load_template(self, contents, description): self.logger.info("Loading template [%s].", description) register_all_params_in_track(contents, self.complete_track_params) try: rendered = render_template( template_source=contents, template_vars=self.track_params, template_internal_vars={"globals": {"build_flavor": self.build_flavor, "serverless_operator": self.serverless_operator}}, ) return json.loads(rendered) except Exception as e: self.logger.exception("Could not load file template for %s.", description) raise TrackSyntaxError("Could not load file template for '%s'" % description, str(e)) def _create_corpora(self, corpora_specs, indices, data_streams): if len(indices) > 0 and len(data_streams) > 0: raise TrackSyntaxError("indices and data-streams cannot both be specified") document_corpora = [] known_corpora_names = set() for corpus_spec in corpora_specs: name = self._r(corpus_spec, "name") if name in known_corpora_names: self._error("Duplicate document corpus name [%s]." % name) known_corpora_names.add(name) meta_data = self._r(corpus_spec, "meta", error_ctx=name, mandatory=False) corpus = track.DocumentCorpus(name=name, meta_data=meta_data) # defaults on corpus level default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None) default_source_format = self._r(corpus_spec, "source-format", mandatory=False, default_value=track.Documents.SOURCE_FORMAT_BULK) default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False) corpus_target_idx = None corpus_target_ds = None corpus_target_type = None if len(indices) == 1: corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name) elif len(indices) > 0: corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False) if len(data_streams) == 1: corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, default_value=data_streams[0].name) elif len(data_streams) > 0: corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False) if len(indices) == 1 and len(indices[0].types) == 1: corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0]) elif len(indices) > 0: corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False) for doc_spec in self._r(corpus_spec, "documents"): base_url = self._r(doc_spec, "base-url", mandatory=False, default_value=default_base_url) source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format) if source_format == track.Documents.SOURCE_FORMAT_BULK: docs = self._r(doc_spec, "source-file") if io.is_archive(docs): document_archive = docs document_file = io.splitext(docs)[0] else: document_archive = None document_file = docs num_docs = self._r(doc_spec, "document-count") compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False) uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False) doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False) includes_action_and_meta_data = self._r( doc_spec, "includes-action-and-meta-data", mandatory=False, default_value=default_action_and_meta_data ) if includes_action_and_meta_data: target_idx = None target_type = None target_ds = None else: target_type = self._r(doc_spec, "target-type", mandatory=False, default_value=corpus_target_type, error_ctx=docs) # require to be specified if we're using data streams and we have no default target_ds = self._r( doc_spec, "target-data-stream", mandatory=len(data_streams) > 0 and corpus_target_ds is None, default_value=corpus_target_ds, error_ctx=docs, ) if target_ds and len(indices) > 0: # if indices are in use we error raise TrackSyntaxError("target-data-stream cannot be used when using indices") if target_ds and target_type: raise TrackSyntaxError("target-type cannot be used when using data-streams") # need an index if we're using indices and no meta-data are present and we don't have a default target_idx = self._r( doc_spec, "target-index", mandatory=len(indices) > 0 and corpus_target_idx is None, default_value=corpus_target_idx, error_ctx=docs, ) # either target_idx or target_ds if target_idx and len(data_streams) > 0: # if data streams are in use we error raise TrackSyntaxError("target-index cannot be used when using data-streams") # we need one or the other if target_idx is None and target_ds is None: raise TrackSyntaxError( f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} is required for {docs}" ) docs = track.Documents( source_format=source_format, document_file=document_file, document_archive=document_archive, base_url=base_url, includes_action_and_meta_data=includes_action_and_meta_data, number_of_documents=num_docs, compressed_size_in_bytes=compressed_bytes, uncompressed_size_in_bytes=uncompressed_bytes, target_index=target_idx, target_type=target_type, target_data_stream=target_ds, meta_data=doc_meta_data, ) corpus.documents.append(docs) else: self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name)) document_corpora.append(corpus) return document_corpora def _create_challenges(self, track_spec): ops = self.parse_operations(self._r(track_spec, "operations", mandatory=False, default_value=[])) track_params = self._r(track_spec, "parameters", mandatory=False, default_value={}) challenges = [] known_challenge_names = set() default_challenge = None challenge_specs, auto_generated = self._get_challenge_specs(track_spec) number_of_challenges = len(challenge_specs) for challenge_spec in challenge_specs: name = self._r(challenge_spec, "name", error_ctx="challenges") description = self._r(challenge_spec, "description", error_ctx=name, mandatory=False) user_info = self._r(challenge_spec, "user-info", error_ctx=name, mandatory=False) challenge_params = self._r(challenge_spec, "parameters", error_ctx=name, mandatory=False, default_value={}) meta_data = self._r(challenge_spec, "meta", error_ctx=name, mandatory=False) # if we only have one challenge it is treated as default challenge, no matter what the user has specified default = number_of_challenges == 1 or self._r(challenge_spec, "default", error_ctx=name, mandatory=False) selected = number_of_challenges == 1 or self.selected_challenge == name if default and default_challenge is not None: self._error( "Both '%s' and '%s' are defined as default challenges. Please define only one of them as default." % (default_challenge.name, name) ) if name in known_challenge_names: self._error("Duplicate challenge with name '%s'." % name) known_challenge_names.add(name) schedule = [] for op in self._r(challenge_spec, "schedule", error_ctx=name): if "parallel" in op: task = self.parse_parallel(op["parallel"], ops, name) else: task = self.parse_task(op, ops, name) schedule.append(task) # verify we don't have any duplicate task names (which can be confusing / misleading in reporting). known_task_names = set() for task in schedule: for sub_task in task: if sub_task.name in known_task_names: self._error( "Challenge '%s' contains multiple tasks with the name '%s'. Please use the task's name property to " "assign a unique name for each task." % (name, sub_task.name) ) else: known_task_names.add(sub_task.name) # merge params final_challenge_params = dict(collections.merge_dicts(track_params, challenge_params)) challenge = track.Challenge( name=name, parameters=final_challenge_params, meta_data=meta_data, description=description, user_info=user_info, default=default, selected=selected, auto_generated=auto_generated, schedule=schedule, ) if default: default_challenge = challenge challenges.append(challenge) if challenges and default_challenge is None: self._error( 'No default challenge specified. Please edit the track and add "default": true to one of the challenges %s.' % ", ".join([c.name for c in challenges]) ) return challenges def _get_challenge_specs(self, track_spec): schedule = self._r(track_spec, "schedule", mandatory=False) challenge = self._r(track_spec, "challenge", mandatory=False) challenges = self._r(track_spec, "challenges", mandatory=False) count_defined = len(list(filter(lambda e: e is not None, [schedule, challenge, challenges]))) if count_defined == 0: self._error("You must define 'challenge', 'challenges' or 'schedule' but none is specified.") elif count_defined > 1: self._error("Multiple out of 'challenge', 'challenges' or 'schedule' are defined but only one of them is allowed.") elif challenge is not None: return [challenge], False elif challenges is not None: return challenges, False elif schedule is not None: return [{"name": "default", "schedule": schedule}], True else: raise AssertionError(f"Unexpected: schedule=[{schedule}], challenge=[{challenge}], challenges=[{challenges}]") def parse_parallel(self, ops_spec, ops, challenge_name): # use same default values as #parseTask() in case the 'parallel' element did not specify anything default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False) default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False) default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False) default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False) default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False) clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False) completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False) # now descent to each operation tasks = [] for task in self._r(ops_spec, "tasks", error_ctx="parallel"): tasks.append( self.parse_task( task, ops, challenge_name, default_warmup_iterations, default_iterations, default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by, ) ) for task in tasks: if task.ramp_up_time_period != default_ramp_up_time_period: if default_ramp_up_time_period is None: self._error( f"task '{task.name}' in 'parallel' element of challenge '{challenge_name}' specifies " f"a ramp-up-time-period but it is only allowed on the 'parallel' element." ) else: self._error( f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing " f"'parallel' element in challenge '{challenge_name}'." ) if completed_by: has_completion_task = False for task in tasks: if task.completes_parent and not has_completion_task: has_completion_task = True elif task.completes_parent: self._error( f"'parallel' element for challenge '{challenge_name}' contains multiple tasks with the " f"name '{completed_by}' marked with 'completed-by' but only task is allowed to match." ) elif task.any_completes_parent: has_completion_task = True if not has_completion_task: self._error( f"'parallel' element for challenge '{challenge_name}' is marked with 'completed-by' with " f"task name '{completed_by}' but no task with this name exists." ) return track.Parallel(tasks, clients) def parse_task( self, task_spec, ops, challenge_name, default_warmup_iterations=None, default_iterations=None, default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None, completed_by_name=None, ): if "operation" not in task_spec: raise TrackSyntaxError("Operation missing from task spec %s in challenge '%s'." % (task_spec, challenge_name)) op_spec = task_spec["operation"] if isinstance(op_spec, str) and op_spec in ops: op = ops[op_spec] else: # may as well an inline operation op = self.parse_operation(op_spec, error_ctx="inline operation in challenge %s" % challenge_name) schedule = self._r(task_spec, "schedule", error_ctx=op.name, mandatory=False) task_name = self._r(task_spec, "name", error_ctx=op.name, mandatory=False, default_value=op.name) task = track.Task( name=task_name, operation=op, tags=self._r(task_spec, "tags", error_ctx=op.name, mandatory=False), meta_data=self._r(task_spec, "meta", error_ctx=op.name, mandatory=False), warmup_iterations=self._r( task_spec, "warmup-iterations", error_ctx=op.name, mandatory=False, default_value=default_warmup_iterations ), iterations=self._r(task_spec, "iterations", error_ctx=op.name, mandatory=False, default_value=default_iterations), warmup_time_period=self._r( task_spec, "warmup-time-period", error_ctx=op.name, mandatory=False, default_value=default_warmup_time_period ), time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False, default_value=default_time_period), ramp_up_time_period=self._r( task_spec, "ramp-up-time-period", error_ctx=op.name, mandatory=False, default_value=default_ramp_up_time_period ), clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1), completes_parent=(task_name == completed_by_name), any_completes_parent=(completed_by_name == "any"), schedule=schedule, # this is to provide scheduler-specific parameters for custom schedulers. params=task_spec, ) if task.warmup_iterations is not None and task.time_period is not None: self._error( f"Operation '{op.name}' in challenge '{challenge_name}' defines {task.warmup_iterations} " f"warmup iterations and a time period of {task.time_period} seconds but mixing time periods " f"and iterations is not allowed." ) elif task.warmup_time_period is not None and task.iterations is not None: self._error( f"Operation '{op.name}' in challenge '{challenge_name}' defines a warmup time period of " f"{task.warmup_time_period} seconds and {task.iterations} iterations but mixing time periods " f"and iterations is not allowed." ) if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None: self._error( f"Operation '{op.name}' in challenge '{challenge_name}' defines a ramp-up time period of " f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and " f"{task.iterations} iterations but mixing time periods and iterations is not allowed." ) if task.ramp_up_time_period is not None: if task.warmup_time_period is None: self._error( f"Operation '{op.name}' in challenge '{challenge_name}' defines a ramp-up time period of " f"{task.ramp_up_time_period} seconds but no warmup-time-period." ) elif task.warmup_time_period < task.ramp_up_time_period: self._error( f"The warmup-time-period of operation '{op.name}' in challenge '{challenge_name}' is " f"{task.warmup_time_period} seconds but must be greater than or equal to the " f"ramp-up-time-period of {task.ramp_up_time_period} seconds." ) return task def parse_operations(self, ops_specs): # key = name, value = operation ops = {} for op_spec in ops_specs: op = self.parse_operation(op_spec) if op.name in ops: self._error("Duplicate operation with name '%s'." % op.name) else: ops[op.name] = op return ops def parse_operation(self, op_spec, error_ctx="operations"): # just a name, let's assume it is a simple operation like force-merge and create a full operation if isinstance(op_spec, str): op_name = op_spec meta_data = None op_type_name = op_spec param_source = None # Cannot have parameters here params = {} else: meta_data = self._r(op_spec, "meta", error_ctx=error_ctx, mandatory=False) # Rally's core operations will still use enums then but we'll allow users to define arbitrary operations op_type_name = self._r(op_spec, "operation-type", error_ctx=error_ctx) # fallback to use the operation type as the operation name op_name = self._r(op_spec, "name", error_ctx=error_ctx, mandatory=False, default_value=op_type_name) param_source = self._r(op_spec, "param-source", error_ctx=error_ctx, mandatory=False) # just pass-through all parameters by default params = op_spec try: op = track.OperationType.from_hyphenated_string(op_type_name) if "include-in-reporting" not in params: params["include-in-reporting"] = not op.admin_op self.logger.debug("Using built-in operation type [%s] for operation [%s].", op_type_name, op_name) except KeyError: self.logger.info("Using user-provided operation type [%s] for operation [%s].", op_type_name, op_name) try: return track.Operation(name=op_name, meta_data=meta_data, operation_type=op_type_name, params=params, param_source=param_source) except exceptions.InvalidSyntax as e: raise TrackSyntaxError("Invalid operation [%s]: %s" % (op_name, str(e)))