esrally/mechanic/mechanic.py (539 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import json
import logging
import os
import pickle
import sys
import traceback
from collections import defaultdict
from typing import Optional
import thespian.actors
from esrally import PROGRAM_NAME, actor, config, exceptions, metrics, paths, types
from esrally.mechanic import launcher, provisioner, supplier, team
from esrally.utils import console, net
METRIC_FLUSH_INTERVAL_SECONDS = 30
def build(cfg: types.Config):
car, plugins = load_team(cfg, external=False)
s = supplier.create(cfg, sources=True, distribution=False, car=car, plugins=plugins)
binaries = s()
console.println(json.dumps(binaries, indent=2), force=True)
def download(cfg: types.Config):
car, plugins = load_team(cfg, external=False)
s = supplier.create(cfg, sources=False, distribution=True, car=car, plugins=plugins)
binaries = s()
console.println(json.dumps(binaries, indent=2), force=True)
def install(cfg: types.Config):
root_path = paths.install_root(cfg)
car, plugins = load_team(cfg, external=False)
# A non-empty distribution-version is provided
distribution = bool(cfg.opts("mechanic", "distribution.version", mandatory=False))
sources = not distribution
build_type = cfg.opts("mechanic", "build.type")
ip = cfg.opts("mechanic", "network.host")
http_port = int(cfg.opts("mechanic", "network.http.port"))
node_name = cfg.opts("mechanic", "node.name")
master_nodes = cfg.opts("mechanic", "master.nodes")
seed_hosts = cfg.opts("mechanic", "seed.hosts")
if build_type == "tar":
binary_supplier = supplier.create(cfg, sources, distribution, car, plugins)
p = provisioner.local(
cfg=cfg,
car=car,
plugins=plugins,
ip=ip,
http_port=http_port,
all_node_ips=seed_hosts,
all_node_names=master_nodes,
target_root=root_path,
node_name=node_name,
)
node_config = p.prepare(binary=binary_supplier())
elif build_type == "docker":
if len(plugins) > 0:
raise exceptions.SystemSetupError(
'You cannot specify any plugins for Docker clusters. Please remove "--elasticsearch-plugins" and try again.'
)
p = provisioner.docker(cfg=cfg, car=car, ip=ip, http_port=http_port, target_root=root_path, node_name=node_name)
# there is no binary for Docker that can be downloaded / built upfront
node_config = p.prepare(binary=None)
else:
raise exceptions.SystemSetupError(f"Unknown build type [{build_type}]")
provisioner.save_node_configuration(root_path, node_config)
console.println(json.dumps({"installation-id": cfg.opts("system", "install.id")}, indent=2), force=True)
def start(cfg: types.Config):
root_path = paths.install_root(cfg)
race_id = cfg.opts("system", "race.id")
# avoid double-launching - we expect that the node file is absent
with contextlib.suppress(FileNotFoundError):
_load_node_file(root_path)
install_id = cfg.opts("system", "install.id")
raise exceptions.SystemSetupError(
"A node with this installation id is already running. Please stop it first "
"with {} stop --installation-id={}".format(PROGRAM_NAME, install_id)
)
node_config = provisioner.load_node_configuration(root_path)
if node_config.build_type == "tar":
node_launcher = launcher.ProcessLauncher(cfg)
elif node_config.build_type == "docker":
node_launcher = launcher.DockerLauncher(cfg)
else:
raise exceptions.SystemSetupError(f"Unknown build type [{node_config.build_type}]")
nodes = node_launcher.start([node_config])
_store_node_file(root_path, (nodes, race_id))
def stop(cfg: types.Config):
root_path = paths.install_root(cfg)
node_config = provisioner.load_node_configuration(root_path)
if node_config.build_type == "tar":
node_launcher = launcher.ProcessLauncher(cfg)
elif node_config.build_type == "docker":
node_launcher = launcher.DockerLauncher(cfg)
else:
raise exceptions.SystemSetupError(f"Unknown build type [{node_config.build_type}]")
nodes, race_id = _load_node_file(root_path)
cls = metrics.metrics_store_class(cfg)
metrics_store = cls(cfg)
race_store = metrics.race_store(cfg)
try:
current_race = race_store.find_by_race_id(race_id)
metrics_store.open(
race_id=current_race.race_id,
race_timestamp=current_race.race_timestamp,
track_name=current_race.track_name,
challenge_name=current_race.challenge_name,
)
except exceptions.NotFound:
logging.getLogger(__name__).info("Could not find race [%s] and will thus not persist system metrics.", race_id)
# Don't persist system metrics if we can't retrieve the race as we cannot derive the required meta-data.
current_race = None
metrics_store = None
node_launcher.stop(nodes, metrics_store)
_delete_node_file(root_path)
if current_race:
metrics_store.flush(refresh=True)
for node in nodes:
results = metrics.calculate_system_results(metrics_store, node.node_name)
current_race.add_results(results)
metrics.results_store(cfg).store_results(current_race)
metrics_store.close()
provisioner.cleanup(
preserve=cfg.opts("mechanic", "preserve.install"), install_dir=node_config.binary_path, data_paths=node_config.data_paths
)
def _load_node_file(root_path):
with open(os.path.join(root_path, "node"), "rb") as f:
return pickle.load(f)
def _store_node_file(root_path, data):
with open(os.path.join(root_path, "node"), "wb") as f:
pickle.dump(data, f)
def _delete_node_file(root_path):
os.remove(os.path.join(root_path, "node"))
##############################
# Public Messages
##############################
class StartEngine:
def __init__(self, cfg: types.Config, open_metrics_context, sources, distribution, external, docker, ip=None, port=None, node_id=None):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.sources = sources
self.distribution = distribution
self.external = external
self.docker = docker
self.ip = ip
self.port = port
self.node_id = node_id
def for_nodes(self, all_node_ips=None, all_node_ids=None, ip=None, port=None, node_ids=None):
"""
Creates a StartNodes instance for a concrete IP, port and their associated node_ids.
:param all_node_ips: The IPs of all nodes in the cluster (including the current one).
:param all_node_ids: The numeric id of all nodes in the cluster (including the current one).
:param ip: The IP to set.
:param port: The port number to set.
:param node_ids: A list of node id to set.
:return: A corresponding ``StartNodes`` message with the specified IP, port number and node ids.
"""
return StartNodes(
self.cfg,
self.open_metrics_context,
self.sources,
self.distribution,
self.external,
self.docker,
all_node_ips,
all_node_ids,
ip,
port,
node_ids,
)
class EngineStarted:
def __init__(self, team_revision):
self.team_revision = team_revision
class StopEngine:
pass
class EngineStopped:
pass
class ResetRelativeTime:
def __init__(self, reset_in_seconds):
self.reset_in_seconds = reset_in_seconds
##############################
# Mechanic internal messages
##############################
class StartNodes:
def __init__(
self,
cfg: types.Config,
open_metrics_context,
sources,
distribution,
external,
docker,
all_node_ips,
all_node_ids,
ip,
port,
node_ids,
):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.sources = sources
self.distribution = distribution
self.external = external
self.docker = docker
self.all_node_ips = all_node_ips
self.all_node_ids = all_node_ids
self.ip = ip
self.port = port
self.node_ids = node_ids
class NodesStarted:
pass
class StopNodes:
pass
class NodesStopped:
pass
def to_ip_port(hosts):
ip_port_pairs = []
for host in hosts:
host = host.copy()
host_or_ip = host.pop("host")
port = host.pop("port", 9200)
if host:
raise exceptions.SystemSetupError(
"When specifying nodes to be managed by Rally you can only supply "
"hostname:port pairs (e.g. 'localhost:9200'), any additional options cannot "
"be supported."
)
ip = net.resolve(host_or_ip)
ip_port_pairs.append((ip, port))
return ip_port_pairs
def extract_all_node_ips(ip_port_pairs):
all_node_ips = set()
for ip, _ in ip_port_pairs:
all_node_ips.add(ip)
return all_node_ips
def extract_all_node_ids(all_nodes_by_host):
all_node_ids = set()
for node_ids_per_host in all_nodes_by_host.values():
all_node_ids.update(node_ids_per_host)
return all_node_ids
def nodes_by_host(ip_port_pairs):
nodes = {}
node_id = 0
for ip_port in ip_port_pairs:
if ip_port not in nodes:
nodes[ip_port] = []
nodes[ip_port].append(node_id)
node_id += 1
return nodes
class MechanicActor(actor.RallyActor):
WAKEUP_RESET_RELATIVE_TIME = "relative_time"
"""
This actor coordinates all associated mechanics on remote hosts (which do the actual work).
"""
def __init__(self):
super().__init__()
self.cfg: Optional[types.Config] = None
self.race_control = None
self.cluster_launcher = None
self.cluster = None
self.car = None
self.team_revision = None
self.externally_provisioned = False
def receiveUnrecognizedMessage(self, msg, sender):
self.logger.debug("MechanicActor#receiveMessage unrecognized(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
def receiveMsg_ChildActorExited(self, msg, sender):
if self.is_current_status_expected(["cluster_stopping", "cluster_stopped"]):
self.logger.info("Child actor exited while engine is stopping or stopped: [%s]", msg)
return
failmsg = "Child actor exited with [%s] while in status [%s]." % (msg, self.status)
self.logger.error(failmsg)
self.send(self.race_control, actor.BenchmarkFailure(failmsg))
def receiveMsg_PoisonMessage(self, msg, sender):
self.logger.info("MechanicActor#receiveMessage poison(msg = [%s] sender = [%s])", str(msg.poisonMessage), str(sender))
# something went wrong with a child actor (or another actor with which we have communicated)
if isinstance(msg.poisonMessage, StartEngine):
failmsg = "Could not start benchmark candidate. Are Rally daemons on all targeted machines running?"
else:
failmsg = msg.details
self.logger.error(failmsg)
self.send(self.race_control, actor.BenchmarkFailure(failmsg))
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, msg, sender):
self.logger.info("Received signal from race control to start engine.")
self.race_control = sender
self.cfg = msg.cfg
assert self.cfg is not None
self.car, _ = load_team(self.cfg, msg.external)
# TODO: This is implicitly set by #load_team() - can we gather this elsewhere?
self.team_revision = self.cfg.opts("mechanic", "repository.revision")
# In our startup procedure we first create all mechanics. Only if this succeeds we'll continue.
hosts = self.cfg.opts("client", "hosts").default
if len(hosts) == 0:
raise exceptions.LaunchError("No target hosts are configured.")
self.externally_provisioned = msg.external
if self.externally_provisioned:
self.logger.info("Cluster will not be provisioned by Rally.")
self.status = "nodes_started"
self.received_responses = []
self.on_all_nodes_started()
self.status = "cluster_started"
else:
console.info("Preparing for race ...", flush=True)
self.logger.info("Cluster consisting of %s will be provisioned by Rally.", hosts)
msg.hosts = hosts
# Initialize the children array to have the right size to
# ensure waiting for all responses
self.children = [None] * len(nodes_by_host(to_ip_port(hosts)))
self.send(self.createActor(Dispatcher), msg)
self.status = "starting"
self.received_responses = []
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStarted(self, msg, sender):
# Initially the addresses of the children are not
# known and there is just a None placeholder in the
# array. As addresses become known, fill them in.
if sender not in self.children:
# Length-limited FIFO characteristics:
self.children.insert(0, sender)
self.children.pop()
self.transition_when_all_children_responded(sender, msg, "starting", "cluster_started", self.on_all_nodes_started)
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_ResetRelativeTime(self, msg, sender):
if msg.reset_in_seconds > 0:
self.wakeupAfter(msg.reset_in_seconds, payload=MechanicActor.WAKEUP_RESET_RELATIVE_TIME)
else:
self.reset_relative_time()
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == MechanicActor.WAKEUP_RESET_RELATIVE_TIME:
self.reset_relative_time()
else:
raise exceptions.RallyAssertionError(f"Unknown wakeup reason [{msg.payload}]")
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.race_control, msg)
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_StopEngine(self, msg, sender):
# we might have experienced a launch error or the user has cancelled the benchmark. Hence we need to allow to stop the
# cluster from various states and we don't check here for a specific one.
if self.externally_provisioned:
self.on_all_nodes_stopped()
else:
self.send_to_children_and_transition(sender, StopNodes(), [], "cluster_stopping")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStopped(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)
def on_all_nodes_started(self):
self.send(self.race_control, EngineStarted(self.team_revision))
def reset_relative_time(self):
for m in self.children:
self.send(m, ResetRelativeTime(0))
def on_all_nodes_stopped(self):
self.send(self.race_control, EngineStopped())
# clear all state as the mechanic might get reused later
for m in self.children:
self.send(m, thespian.actors.ActorExitRequest())
self.children = []
# do not self-terminate, let the parent actor handle this
@thespian.actors.requireCapability("coordinator")
class Dispatcher(actor.RallyActor):
"""This Actor receives a copy of the startmsg (with the computed hosts
attached) and creates a NodeMechanicActor on each targeted
remote host. It uses Thespian SystemRegistration to get
notification of when remote nodes are available. As a special
case, if an IP address is localhost, the NodeMechanicActor is
immediately created locally. Once All NodeMechanicActors are
started, it will send them all their startup message, with a
reply-to back to the actor that made the request of the
Dispatcher.
"""
def __init__(self):
super().__init__()
self.start_sender = None
self.pending = None
self.remotes = None
@actor.no_retry("mechanic dispatcher") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, startmsg, sender):
self.start_sender = sender
self.pending = []
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
all_nodes_by_host = nodes_by_host(all_ips_and_ports)
all_node_ids = extract_all_node_ids(all_nodes_by_host)
for (ip, port), node in all_nodes_by_host.items():
submsg = startmsg.for_nodes(all_node_ips, all_node_ids, ip, port, node)
submsg.reply_to = sender
if ip == "127.0.0.1":
m = self.createActor(NodeMechanicActor, targetActorRequirements={"coordinator": True})
self.pending.append((m, submsg))
else:
self.remotes[ip].append(submsg)
if self.remotes:
# Now register with the ActorSystem to be told about all
# remote nodes (via the ActorSystemConventionUpdate below).
self.notifyOnSystemRegistrationChanges(True)
else:
self.send_all_pending()
# Could also initiate a wakeup message to fail this if not all
# remotes come online within the expected amount of time... TBD
def receiveMsg_ActorSystemConventionUpdate(self, convmsg, sender):
if not convmsg.remoteAdded:
self.logger.warning("Remote Rally node [%s] exited during NodeMechanicActor startup process.", convmsg.remoteAdminAddress)
self.start_sender(actor.BenchmarkFailure("Remote Rally node [%s] has been shutdown prematurely." % convmsg.remoteAdminAddress))
else:
remote_ip = convmsg.remoteCapabilities.get("ip", None)
self.logger.info("Remote Rally node [%s] has started.", remote_ip)
for eachmsg in self.remotes[remote_ip]:
self.pending.append((self.createActor(NodeMechanicActor, targetActorRequirements={"ip": remote_ip}), eachmsg))
if remote_ip in self.remotes:
del self.remotes[remote_ip]
if not self.remotes:
# Notifications are no longer needed
self.notifyOnSystemRegistrationChanges(False)
self.send_all_pending()
def send_all_pending(self):
# Invoked when all remotes have checked in and self.pending is
# the list of remote NodeMechanic actors and messages to send.
for each in self.pending:
self.send(*each)
self.pending = []
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.start_sender, msg)
def receiveMsg_PoisonMessage(self, msg, sender):
self.send(self.start_sender, actor.BenchmarkFailure(msg.details))
def receiveUnrecognizedMessage(self, msg, sender):
self.logger.info("mechanic.Dispatcher#receiveMessage unrecognized(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
class NodeMechanicActor(actor.RallyActor):
"""
One instance of this actor is run on each target host and coordinates the actual work of starting / stopping all nodes that should run
on this host.
"""
def __init__(self):
super().__init__()
self.mechanic = None
self.host = None
def receiveMsg_StartNodes(self, msg, sender):
try:
self.host = msg.ip
if msg.external:
self.logger.info("Connecting to externally provisioned nodes on [%s].", msg.ip)
else:
self.logger.info("Starting node(s) %s on [%s].", msg.node_ids, msg.ip)
# Load node-specific configuration
cfg = config.auto_load_local_config(
msg.cfg,
additional_sections=[
# only copy the relevant bits
"track",
"mechanic",
"client",
"telemetry",
# allow metrics store to extract race meta-data
"race",
"source",
],
)
# set root path (normally done by the main entry point)
cfg.add(config.Scope.application, "node", "rally.root", paths.rally_root())
if not msg.external:
cfg.add(config.Scope.benchmark, "provisioning", "node.ids", msg.node_ids)
cls = metrics.metrics_store_class(cfg)
metrics_store = cls(cfg)
metrics_store.open(ctx=msg.open_metrics_context)
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.
self.mechanic = create(
cfg,
metrics_store,
msg.ip,
msg.port,
msg.all_node_ips,
msg.all_node_ids,
msg.sources,
msg.distribution,
msg.external,
msg.docker,
)
self.mechanic.start_engine()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
self.send(getattr(msg, "reply_to", sender), NodesStarted())
except Exception:
self.logger.exception("Cannot process message [%s]", msg)
# avoid "can't pickle traceback objects"
_, ex_value, _ = sys.exc_info()
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure(ex_value, traceback.format_exc()))
def receiveMsg_PoisonMessage(self, msg, sender):
if sender != self.myAddress:
self.send(sender, actor.BenchmarkFailure(msg.details))
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(getattr(msg, "reply_to", sender), msg)
def receiveUnrecognizedMessage(self, msg, sender):
# at the moment, we implement all message handling blocking. This is not ideal but simple to get started with. Besides, the caller
# needs to block anyway. The only reason we implement mechanic as an actor is to distribute them.
# noinspection PyBroadException
try:
self.logger.debug("NodeMechanicActor#receiveMessage(msg = [%s] sender = [%s])", str(type(msg)), str(sender))
if isinstance(msg, ResetRelativeTime) and self.mechanic:
self.mechanic.reset_relative_time()
elif isinstance(msg, thespian.actors.WakeupMessage) and self.mechanic:
self.mechanic.flush_metrics()
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
elif isinstance(msg, StopNodes):
self.mechanic.stop_engine()
self.send(sender, NodesStopped())
self.mechanic = None
elif isinstance(msg, thespian.actors.ActorExitRequest):
if self.mechanic:
self.mechanic.stop_engine()
self.mechanic = None
except BaseException as e:
self.logger.exception("Cannot process message [%s]", msg)
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure("Error on host %s" % str(self.host), e))
#####################################################
# Internal API (only used by the actor and for tests)
#####################################################
def load_team(cfg: types.Config, external):
# externally provisioned clusters do not support cars / plugins
if external:
car = None
plugins = []
else:
team_path = team.team_path(cfg)
car = team.load_car(team_path, cfg.opts("mechanic", "car.names"), cfg.opts("mechanic", "car.params"))
plugins = team.load_plugins(
team_path, cfg.opts("mechanic", "car.plugins", mandatory=False), cfg.opts("mechanic", "plugin.params", mandatory=False)
)
return car, plugins
def create(
cfg: types.Config,
metrics_store,
node_ip,
node_http_port,
all_node_ips,
all_node_ids,
sources=False,
distribution=False,
external=False,
docker=False,
):
race_root_path = paths.race_root(cfg)
node_ids = cfg.opts("provisioning", "node.ids", mandatory=False)
node_name_prefix = cfg.opts("provisioning", "node.name.prefix")
car, plugins = load_team(cfg, external)
if sources or distribution:
s = supplier.create(cfg, sources, distribution, car, plugins)
p = []
all_node_names = ["%s-%s" % (node_name_prefix, n) for n in all_node_ids]
for node_id in node_ids:
node_name = "%s-%s" % (node_name_prefix, node_id)
p.append(provisioner.local(cfg, car, plugins, node_ip, node_http_port, all_node_ips, all_node_names, race_root_path, node_name))
l = launcher.ProcessLauncher(cfg)
elif external:
raise exceptions.RallyAssertionError("Externally provisioned clusters should not need to be managed by Rally's mechanic")
elif docker:
if len(plugins) > 0:
raise exceptions.SystemSetupError(
'You cannot specify any plugins for Docker clusters. Please remove "--elasticsearch-plugins" and try again.'
)
def s():
return None
p = []
for node_id in node_ids:
node_name = "%s-%s" % (node_name_prefix, node_id)
p.append(provisioner.docker(cfg, car, node_ip, node_http_port, race_root_path, node_name))
l = launcher.DockerLauncher(cfg)
else:
# It is a programmer error (and not a user error) if this function is called with wrong parameters
raise RuntimeError("One of sources, distribution, docker or external must be True")
return Mechanic(cfg, metrics_store, s, p, l)
class Mechanic:
"""
Mechanic is responsible for preparing the benchmark candidate (i.e. all benchmark candidate related activities before and after
running the benchmark).
"""
def __init__(self, cfg: types.Config, metrics_store, supply, provisioners, launcher):
self.cfg = cfg
self.preserve_install = cfg.opts("mechanic", "preserve.install")
self.metrics_store = metrics_store
self.supply = supply
self.provisioners = provisioners
self.launcher = launcher
self.nodes = []
self.node_configs = []
self.logger = logging.getLogger(__name__)
def start_engine(self):
binaries = self.supply()
self.node_configs = []
for p in self.provisioners:
self.node_configs.append(p.prepare(binaries))
self.nodes = self.launcher.start(self.node_configs)
return self.nodes
def reset_relative_time(self):
self.logger.info("Resetting relative time of system metrics store.")
self.metrics_store.reset_relative_time()
def flush_metrics(self, refresh=False):
self.logger.debug("Flushing system metrics.")
self.metrics_store.flush(refresh=refresh)
def stop_engine(self):
self.logger.info("Stopping nodes %s.", self.nodes)
self.launcher.stop(self.nodes, self.metrics_store)
self.flush_metrics(refresh=True)
try:
current_race = self._current_race()
for node in self.nodes:
self._add_results(current_race, node)
except exceptions.NotFound as e:
self.logger.warning("Cannot store system metrics: %s.", str(e))
self.metrics_store.close()
self.nodes = []
for node_config in self.node_configs:
provisioner.cleanup(preserve=self.preserve_install, install_dir=node_config.binary_path, data_paths=node_config.data_paths)
self.node_configs = []
def _current_race(self):
race_id = self.cfg.opts("system", "race.id")
return metrics.race_store(self.cfg).find_by_race_id(race_id)
def _add_results(self, current_race, node):
results = metrics.calculate_system_results(self.metrics_store, node.node_name)
current_race.add_results(results)
metrics.results_store(self.cfg).store_results(current_race)