esrally/racecontrol.py (304 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import collections import logging import os import sys from typing import Optional import tabulate import thespian.actors from esrally import ( PROGRAM_NAME, actor, client, config, doc_link, driver, exceptions, mechanic, metrics, reporter, track, types, version, ) from esrally.utils import console, opts, versions pipelines = collections.OrderedDict() class Pipeline: """ Describes a whole execution pipeline. A pipeline can consist of one or more steps. Each pipeline should contain roughly of the following steps: * Prepare the benchmark candidate: It can build Elasticsearch from sources, download a ZIP from somewhere etc. * Launch the benchmark candidate: This can be done directly, with tools like Ansible or it can assume the candidate is already launched * Run the benchmark * Report results """ def __init__(self, name, description, target, stable=True): """ Creates a new pipeline. :param name: A short name of the pipeline. This name will be used to reference it from the command line. :param description: A human-readable description what the pipeline does. :param target: A function that implements this pipeline :param stable True if the pipeline is considered production quality. """ self.name = name self.description = description self.target = target self.stable = stable pipelines[name] = self def __call__(self, cfg: types.Config): self.target(cfg) class Setup: def __init__(self, cfg: types.Config, sources=False, distribution=False, external=False, docker=False): self.cfg = cfg self.sources = sources self.distribution = distribution self.external = external self.docker = docker class Success: pass class BenchmarkActor(actor.RallyActor): def __init__(self): super().__init__() self.cfg: Optional[types.Config] = None self.start_sender = None self.mechanic = None self.main_driver = None self.coordinator = None def receiveMsg_PoisonMessage(self, msg, sender): self.logger.debug("BenchmarkActor got notified of poison message [%s] (forwarding).", (str(msg))) if self.coordinator: self.coordinator.error = True self.send(self.start_sender, msg) def receiveUnrecognizedMessage(self, msg, sender): self.logger.debug("BenchmarkActor received unknown message [%s] (ignoring).", (str(msg))) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_Setup(self, msg, sender): self.start_sender = sender self.cfg = msg.cfg assert self.cfg is not None self.coordinator = BenchmarkCoordinator(msg.cfg) self.coordinator.setup(sources=msg.sources) self.logger.info("Asking mechanic to start the engine.") self.mechanic = self.createActor(mechanic.MechanicActor, targetActorRequirements={"coordinator": True}) self.send( self.mechanic, mechanic.StartEngine( self.cfg, self.coordinator.metrics_store.open_context, msg.sources, msg.distribution, msg.external, msg.docker, ), ) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_EngineStarted(self, msg, sender): assert self.cfg is not None self.logger.info("Mechanic has started engine successfully.") self.coordinator.race.team_revision = msg.team_revision self.main_driver = self.createActor(driver.DriverActor, targetActorRequirements={"coordinator": True}) self.logger.info("Telling driver to prepare for benchmarking.") self.send(self.main_driver, driver.PrepareBenchmark(self.cfg, self.coordinator.current_track)) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_PreparationComplete(self, msg, sender): self.coordinator.on_preparation_complete(msg.distribution_flavor, msg.distribution_version, msg.revision) self.logger.info("Telling driver to start benchmark.") self.send(self.main_driver, driver.StartBenchmark()) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_TaskFinished(self, msg, sender): self.coordinator.on_task_finished(msg.metrics) # We choose *NOT* to reset our own metrics store's timer as this one is only used to collect complete metrics records from # other stores (used by driver and mechanic). Hence there is no need to reset the timer in our own metrics store. self.send(self.mechanic, mechanic.ResetRelativeTime(msg.next_task_scheduled_in)) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_BenchmarkCancelled(self, msg, sender): self.coordinator.cancelled = True # even notify the start sender if it is the originator. The reason is that we call #ask() which waits for a reply. # We also need to ask in order to avoid races between this notification and the following ActorExitRequest. self.send(self.start_sender, msg) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_BenchmarkFailure(self, msg, sender): self.logger.info("Received a benchmark failure from [%s] and will forward it now.", sender) self.coordinator.error = True self.send(self.start_sender, msg) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_BenchmarkComplete(self, msg, sender): self.coordinator.on_benchmark_complete(msg.metrics) self.send(self.main_driver, thespian.actors.ActorExitRequest()) self.main_driver = None self.logger.info("Asking mechanic to stop the engine.") self.send(self.mechanic, mechanic.StopEngine()) @actor.no_retry("race control") # pylint: disable=no-value-for-parameter def receiveMsg_EngineStopped(self, msg, sender): self.logger.info("Mechanic has stopped engine successfully.") self.send(self.start_sender, Success()) class BenchmarkCoordinator: def __init__(self, cfg: types.Config): self.logger = logging.getLogger(__name__) self.cfg = cfg self.race = None self.metrics_store = None self.race_store = None self.cancelled = False self.error = False self.track_revision = None self.current_track = None self.current_challenge = None def setup(self, sources=False): # to load the track we need to know the correct cluster distribution version. Usually, this value should be set # but there are rare cases (external pipeline and user did not specify the distribution version) where we need # to derive it ourselves. For source builds we always assume "main" if not sources and not self.cfg.exists("mechanic", "distribution.version"): hosts = self.cfg.opts("client", "hosts").default client_options = self.cfg.opts("client", "options").default ( distribution_flavor, distribution_version, distribution_build_hash, serverless_operator, ) = client.factory.cluster_distribution_version(hosts, client_options) self.logger.info( "Automatically derived distribution flavor [%s], version [%s], and build hash [%s]", distribution_flavor, distribution_version, distribution_build_hash, ) self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version) self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.flavor", distribution_flavor) if versions.is_serverless(distribution_flavor): if not self.cfg.exists("driver", "serverless.mode"): self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) if not self.cfg.exists("driver", "serverless.operator"): self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", serverless_operator) console.info(f"Detected Elasticsearch Serverless mode with operator=[{serverless_operator}].") else: min_es_version = versions.Version.from_string(version.minimum_es_version()) specified_version = versions.Version.from_string(distribution_version) if specified_version < min_es_version: raise exceptions.SystemSetupError( f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]" ) self.current_track = track.load_track(self.cfg, install_dependencies=True) self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False) challenge_name = self.cfg.opts("track", "challenge.name") self.current_challenge = self.current_track.find_challenge_or_default(challenge_name) if self.current_challenge is None: raise exceptions.SystemSetupError( "Track [{}] does not provide challenge [{}]. List the available tracks with {} list tracks.".format( self.current_track.name, challenge_name, PROGRAM_NAME ) ) if self.current_challenge.user_info: console.info(self.current_challenge.user_info) for message in self.current_challenge.serverless_info: console.info(message) self.race = metrics.create_race(self.cfg, self.current_track, self.current_challenge, self.track_revision) self.metrics_store = metrics.metrics_store( self.cfg, track=self.race.track_name, challenge=self.race.challenge_name, read_only=False ) self.race_store = metrics.race_store(self.cfg) def on_preparation_complete(self, distribution_flavor, distribution_version, revision): self.race.distribution_flavor = distribution_flavor self.race.distribution_version = distribution_version self.race.revision = revision # store race initially (without any results) so other components can retrieve full metadata self.race_store.store_race(self.race) if self.race.challenge.auto_generated: console.info( "Racing on track [{}] and car {} with version [{}].\n".format( self.race.track_name, self.race.car, self.race.distribution_version ) ) else: console.info( "Racing on track [{}], challenge [{}] and car {} with version [{}].\n".format( self.race.track_name, self.race.challenge_name, self.race.car, self.race.distribution_version ) ) def on_task_finished(self, new_metrics): self.logger.info("Bulk adding request metrics to metrics store.") self.metrics_store.bulk_add(new_metrics) def on_benchmark_complete(self, new_metrics): self.logger.info("Benchmark is complete.") self.logger.info("Bulk adding request metrics to metrics store.") self.metrics_store.bulk_add(new_metrics) self.metrics_store.flush() if not self.cancelled and not self.error: final_results = metrics.calculate_results(self.metrics_store, self.race) self.race.add_results(final_results) self.race_store.store_race(self.race) metrics.results_store(self.cfg).store_results(self.race) reporter.summarize(final_results, self.cfg) else: self.logger.info("Suppressing output of summary report. Cancelled = [%r], Error = [%r].", self.cancelled, self.error) self.metrics_store.close() def race(cfg: types.Config, sources=False, distribution=False, external=False, docker=False): logger = logging.getLogger(__name__) # at this point an actor system has to run and we should only join actor_system = actor.bootstrap_actor_system(try_join=True) benchmark_actor = actor_system.createActor(BenchmarkActor, targetActorRequirements={"coordinator": True}) try: result = actor_system.ask(benchmark_actor, Setup(cfg, sources, distribution, external, docker)) if isinstance(result, Success): logger.info("Benchmark has finished successfully.") # may happen if one of the load generators has detected that the user has cancelled the benchmark. elif isinstance(result, actor.BenchmarkCancelled): logger.info("User has cancelled the benchmark (detected by actor).") elif isinstance(result, actor.BenchmarkFailure): logger.error("A benchmark failure has occurred") raise exceptions.RallyError(result.message, result.cause) else: raise exceptions.RallyError("Got an unexpected result during benchmarking: [%s]." % str(result)) except KeyboardInterrupt: logger.info("User has cancelled the benchmark (detected by race control).") # notify the coordinator so it can properly handle this state. Do it blocking so we don't have a race between this message # and the actor exit request. actor_system.ask(benchmark_actor, actor.BenchmarkCancelled()) raise exceptions.UserInterrupted("User has cancelled the benchmark (detected by race control).") from None finally: logger.info("Telling benchmark actor to exit.") actor_system.tell(benchmark_actor, thespian.actors.ActorExitRequest()) def set_default_hosts(cfg: types.Config, host="127.0.0.1", port=9200): logger = logging.getLogger(__name__) configured_hosts = cfg.opts("client", "hosts") if len(configured_hosts.default) != 0: logger.info("Using configured hosts %s", configured_hosts.default) else: logger.info("Setting default host to [%s:%d]", host, port) default_host_object = opts.TargetHosts(f"{host}:{port}") cfg.add(config.Scope.benchmark, "client", "hosts", default_host_object) # Poor man's curry def from_sources(cfg: types.Config): port = cfg.opts("provisioning", "node.http.port") set_default_hosts(cfg, port=port) return race(cfg, sources=True) def from_distribution(cfg: types.Config): port = cfg.opts("provisioning", "node.http.port") set_default_hosts(cfg, port=port) return race(cfg, distribution=True) def benchmark_only(cfg: types.Config): set_default_hosts(cfg) # We'll use a special car name for external benchmarks. cfg.add(config.Scope.benchmark, "mechanic", "car.names", ["external"]) return race(cfg, external=True) def docker(cfg: types.Config): set_default_hosts(cfg) return race(cfg, docker=True) Pipeline("from-sources", "Builds and provisions Elasticsearch, runs a benchmark and reports results.", from_sources) Pipeline( "from-distribution", "Downloads an Elasticsearch distribution, provisions it, runs a benchmark and reports results.", from_distribution ) Pipeline("benchmark-only", "Assumes an already running Elasticsearch instance, runs a benchmark and reports results", benchmark_only) # Very experimental Docker pipeline. Should only be used with great care and is also not supported on all platforms. Pipeline("docker", "Runs a benchmark against the official Elasticsearch Docker container and reports results", docker, stable=False) def available_pipelines(): return [[pipeline.name, pipeline.description] for pipeline in pipelines.values() if pipeline.stable] def list_pipelines(): console.println("Available pipelines:\n") console.println(tabulate.tabulate(available_pipelines(), headers=["Name", "Description"])) def run(cfg: types.Config): logger = logging.getLogger(__name__) name = cfg.opts("race", "pipeline") race_id = cfg.opts("system", "race.id") console.info(f"Race id is [{race_id}]", logger=logger) if len(name) == 0: # assume from-distribution pipeline if distribution.version has been specified and --pipeline cli arg not set if cfg.exists("mechanic", "distribution.version"): name = "from-distribution" else: name = "from-sources" logger.info("User specified no pipeline. Automatically derived pipeline [%s].", name) cfg.add(config.Scope.applicationOverride, "race", "pipeline", name) else: logger.info("User specified pipeline [%s].", name) if os.environ.get("RALLY_RUNNING_IN_DOCKER", "").upper() == "TRUE": # in this case only benchmarking remote Elasticsearch clusters makes sense if name != "benchmark-only": raise exceptions.SystemSetupError( "Only the [benchmark-only] pipeline is supported by the Rally Docker image.\n" "Add --pipeline=benchmark-only in your Rally arguments and try again.\n" "For more details read the docs for the benchmark-only pipeline in {}\n".format(doc_link("pipelines.html#benchmark-only")) ) try: pipeline = pipelines[name] except KeyError: raise exceptions.SystemSetupError( "Unknown pipeline [%s]. List the available pipelines with %s list pipelines." % (name, PROGRAM_NAME) ) try: pipeline(cfg) except exceptions.RallyError as e: # just pass on our own errors. It should be treated differently on top-level raise e except KeyboardInterrupt: logger.info("User has cancelled the benchmark.") raise exceptions.UserInterrupted("User has cancelled the benchmark (detected by race control).") from None except BaseException: tb = sys.exc_info()[2] raise exceptions.RallyError("This race ended with a fatal crash.").with_traceback(tb)