# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import collections
import datetime
import glob
import json
import logging
import math
import os
import pickle
import random
import statistics
import sys
import uuid
import zlib
from enum import Enum, IntEnum

import tabulate

from esrally import client, config, exceptions, paths, time, types, version
from esrally.utils import console, convert, io, pretty, versions


class EsClient:
    """
    Provides a stripped-down client interface that is easier to exchange for testing
    """

    def __init__(self, client, cluster_version=None):
        self._client = client
        self.logger = logging.getLogger(__name__)
        self._cluster_version = cluster_version
        self.retryable_status_codes = [502, 503, 504, 429]

    def get_template(self, name):
        return self.guarded(self._client.indices.get_index_template, name=name)

    def put_template(self, name, template):
        tmpl = json.loads(template)
        return self.guarded(self._client.indices.put_index_template, name=name, **tmpl)

    def template_exists(self, name):
        return self.guarded(self._client.indices.exists_index_template, name=name)

    def delete_by_query(self, index, body):
        return self.guarded(self._client.delete_by_query, index=index, body=body)

    def delete(self, index, id):
        # ignore 404 status code (NotFoundError) when index does not exist
        return self.guarded(self._client.delete, index=index, id=id, ignore=404)

    def get_index(self, name):
        return self.guarded(self._client.indices.get, name=name)

    def create_index(self, index):
        # ignore 400 status code (BadRequestError) when index already exists
        return self.guarded(self._client.indices.create, index=index, ignore=400)

    def exists(self, index):
        return self.guarded(self._client.indices.exists, index=index)

    def refresh(self, index):
        return self.guarded(self._client.indices.refresh, index=index)

    def bulk_index(self, index, items):
        # pylint: disable=import-outside-toplevel
        import elasticsearch.helpers

        self.guarded(elasticsearch.helpers.bulk, self._client, items, index=index, chunk_size=5000)

    def index(self, index, item, id=None):
        doc = {"_source": item}
        if id:
            doc["_id"] = id
        self.bulk_index(index, [doc])

    def search(self, index, body):
        return self.guarded(self._client.search, index=index, body=body)

    def guarded(self, target, *args, **kwargs):
        # pylint: disable=import-outside-toplevel
        import elasticsearch
        import elasticsearch.helpers
        from elastic_transport import ApiError, TransportError

        max_execution_count = 10
        execution_count = 0

        while execution_count <= max_execution_count:
            time_to_sleep = 2**execution_count + random.random()
            execution_count += 1

            try:
                return target(*args, **kwargs)
            except elasticsearch.exceptions.ConnectionTimeout as e:
                if execution_count <= max_execution_count:
                    self.logger.debug(
                        "Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
                        e.message,
                        execution_count,
                        max_execution_count,
                        time_to_sleep,
                    )
                    time.sleep(time_to_sleep)
                else:
                    operation = target.__name__
                    self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count)
                    node = self._client.transport.node_pool.get()
                    msg = (
                        "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on "
                        "host [%s] at port [%s]." % (operation, node.host, node.port)
                    )
                    raise exceptions.RallyError(msg)
            except elasticsearch.exceptions.ConnectionError as e:
                if execution_count <= max_execution_count:
                    self.logger.debug(
                        "Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
                        e.message,
                        execution_count,
                        max_execution_count,
                        time_to_sleep,
                    )
                    time.sleep(time_to_sleep)
                else:
                    node = self._client.transport.node_pool.get()
                    msg = (
                        "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]"
                        " or fix the configuration in [%s]." % (node.host, node.port, config.ConfigFile().location)
                    )
                    self.logger.exception(msg)
                    # connection errors doesn't neccessarily mean it's during setup
                    raise exceptions.RallyError(msg)
            except elasticsearch.exceptions.AuthenticationException:
                # we know that it is just one host (see EsClientFactory)
                node = self._client.transport.node_pool.get()
                msg = (
                    "The configured user could not authenticate against your Elasticsearch metrics store running on host [%s] at "
                    "port [%s] (wrong password?). Please fix the configuration in [%s]."
                    % (node.host, node.port, config.ConfigFile().location)
                )
                self.logger.exception(msg)
                raise exceptions.SystemSetupError(msg)
            except elasticsearch.exceptions.AuthorizationException:
                node = self._client.transport.node_pool.get()
                msg = (
                    "The configured user does not have enough privileges to run the operation [%s] against your Elasticsearch metrics "
                    "store running on host [%s] at port [%s]. Please adjust your x-pack configuration or specify a user with enough "
                    "privileges in the configuration in [%s]." % (target.__name__, node.host, node.port, config.ConfigFile().location)
                )
                self.logger.exception(msg)
                raise exceptions.SystemSetupError(msg)
            except elasticsearch.helpers.BulkIndexError as e:
                for err in e.errors:
                    err_type = err.get("index", {}).get("error", {}).get("type", None)
                    if err.get("index", {}).get("status", None) not in self.retryable_status_codes:
                        msg = f"Unretryable error encountered when sending metrics to remote metrics store: [{err_type}]"
                        self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
                        raise exceptions.RallyError(msg)

                if execution_count <= max_execution_count:
                    self.logger.debug(
                        "Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
                        e,
                        execution_count,
                        max_execution_count,
                        time_to_sleep,
                    )
                    time.sleep(time_to_sleep)
                else:
                    msg = f"Failed to send metrics to remote metrics store: [{e.errors}]"
                    self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
                    raise exceptions.RallyError(msg)
            except ApiError as e:
                if e.status_code in self.retryable_status_codes and execution_count <= max_execution_count:
                    self.logger.debug(
                        "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.",
                        e.error,
                        e.status_code,
                        execution_count,
                        max_execution_count,
                        time_to_sleep,
                    )
                    time.sleep(time_to_sleep)
                else:
                    node = self._client.transport.node_pool.get()
                    msg = (
                        "An error [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] "
                        "at port [%s]." % (e.error, target.__name__, node.host, node.port)
                    )
                    self.logger.exception(msg)
                    # this does not necessarily mean it's a system setup problem...
                    raise exceptions.RallyError(msg)
            except TransportError as e:
                node = self._client.transport.node_pool.get()

                if e.errors:
                    err = e.errors
                else:
                    err = e
                msg = (
                    "Transport error(s) [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on "
                    "host [%s] at port [%s]." % (err, target.__name__, node.host, node.port)
                )
                self.logger.exception(msg)
                # this does not necessarily mean it's a system setup problem...
                raise exceptions.RallyError(msg)


class EsClientFactory:
    """
    Abstracts how the Elasticsearch client is created. Intended for testing.
    """

    def __init__(self, cfg: types.Config):
        self._config = cfg
        host = self._config.opts("reporting", "datastore.host")
        port = self._config.opts("reporting", "datastore.port")
        hosts = [{"host": host, "port": port}]
        secure = convert.to_bool(self._config.opts("reporting", "datastore.secure"))
        user = self._config.opts("reporting", "datastore.user")
        distribution_version = None
        distribution_flavor = None
        try:
            password = os.environ["RALLY_REPORTING_DATASTORE_PASSWORD"]
        except KeyError:
            try:
                password = self._config.opts("reporting", "datastore.password")
            except exceptions.ConfigError:
                raise exceptions.ConfigError(
                    "No password configured through [reporting] configuration or RALLY_REPORTING_DATASTORE_PASSWORD environment variable."
                ) from None

        verify = self._config.opts("reporting", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
        ca_path = self._config.opts("reporting", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
        self.probe_version = self._config.opts("reporting", "datastore.probe.cluster_version", default_value=True, mandatory=False)

        # Instead of duplicating code, we're just adapting the metrics store specific properties to match the regular client options.
        client_options = {
            "use_ssl": secure,
            "verify_certs": verify,
            "timeout": 120,
        }
        if ca_path:
            client_options["ca_certs"] = ca_path
        if user and password:
            client_options["basic_auth_user"] = user
            client_options["basic_auth_password"] = password

        # TODO #1335: Use version-specific support for metrics stores after 7.8.0.
        if self.probe_version:
            distribution_flavor, distribution_version, _, _ = client.cluster_distribution_version(
                hosts=hosts, client_options=client_options
            )
            self._cluster_version = distribution_version

        factory = client.EsClientFactory(
            hosts=hosts,
            client_options=client_options,
            distribution_version=distribution_version,
            distribution_flavor=distribution_flavor,
        )
        self._client = factory.create()

    def create(self):
        c = EsClient(self._client)
        return c


class IndexTemplateProvider:
    """
    Abstracts how the Rally index template is retrieved. Intended for testing.
    """

    def __init__(self, cfg: types.Config):
        self._config = cfg
        self._number_of_shards = self._config.opts("reporting", "datastore.number_of_shards", default_value=None, mandatory=False)
        self._number_of_replicas = self._config.opts("reporting", "datastore.number_of_replicas", default_value=None, mandatory=False)
        self.script_dir = self._config.opts("node", "rally.root")

    def metrics_template(self):
        return self._read("metrics-template")

    def races_template(self):
        return self._read("races-template")

    def results_template(self):
        return self._read("results-template")

    def annotations_template(self):
        return self._read("annotation-template")

    def _read(self, template_name):
        with open("%s/resources/%s.json" % (self.script_dir, template_name), encoding="utf-8") as f:
            template = json.load(f)
            if self._number_of_shards is not None:
                if int(self._number_of_shards) < 1:
                    raise exceptions.SystemSetupError(
                        f"The setting: datastore.number_of_shards must be >= 1. Please "
                        f"check the configuration in {self._config.config_file.location}"
                    )
                template["template"]["settings"]["index"]["number_of_shards"] = int(self._number_of_shards)
            if self._number_of_replicas is not None:
                template["template"]["settings"]["index"]["number_of_replicas"] = int(self._number_of_replicas)
            return json.dumps(template)


class MetaInfoScope(Enum):
    """
    Defines the scope of a meta-information. Meta-information provides more context for a metric, for example the concrete version
    of Elasticsearch that has been benchmarked or environment information like CPU model or OS.
    """

    cluster = 1
    """
    Cluster level meta-information is valid for all nodes in the cluster (e.g. the benchmarked Elasticsearch version)
    """
    node = 3
    """
    Node level meta-information is valid for a single node (e.g. GC times)
    """


def calculate_results(store, race):
    calc = GlobalStatsCalculator(store, race.track, race.challenge)
    return calc()


def calculate_system_results(store, node_name):
    calc = SystemStatsCalculator(store, node_name)
    return calc()


def metrics_store(cfg: types.Config, read_only=True, track=None, challenge=None, car=None, meta_info=None):
    """
    Creates a proper metrics store based on the current configuration.

    :param cfg: Config object.
    :param read_only: Whether to open the metrics store only for reading (Default: True).
    :return: A metrics store implementation.
    """
    cls = metrics_store_class(cfg)
    store = cls(cfg=cfg, meta_info=meta_info)
    logging.getLogger(__name__).info("Creating %s", str(store))

    race_id = cfg.opts("system", "race.id")
    race_timestamp = cfg.opts("system", "time.start")
    selected_car = cfg.opts("mechanic", "car.names") if car is None else car

    store.open(race_id, race_timestamp, track, challenge, selected_car, create=not read_only)
    return store


def metrics_store_class(cfg: types.Config):
    if cfg.opts("reporting", "datastore.type") == "elasticsearch":
        return EsMetricsStore
    else:
        return InMemoryMetricsStore


class SampleType(IntEnum):
    Warmup = 0
    Normal = 1


class MetricsStore:
    """
    Abstract metrics store
    """

    def __init__(self, cfg: types.Config, clock=time.Clock, meta_info=None):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        """
        self._config = cfg
        self._race_id = None
        self._race_timestamp = None
        self._track = None
        self._track_params = cfg.opts("track", "params", default_value={}, mandatory=False)
        self._challenge = None
        self._car = None
        self._car_name = None
        self._environment_name = cfg.opts("system", "env.name")
        self.opened = False
        if meta_info is None:
            self._meta_info = {}
        else:
            self._meta_info = meta_info
        # ensure mandatory keys are always present
        if MetaInfoScope.cluster not in self._meta_info:
            self._meta_info[MetaInfoScope.cluster] = {}
        if MetaInfoScope.node not in self._meta_info:
            self._meta_info[MetaInfoScope.node] = {}
        self._clock = clock
        self._stop_watch = self._clock.stop_watch()
        self.logger = logging.getLogger(__name__)

    def open(self, race_id=None, race_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
        """
        Opens a metrics store for a specific race, track, challenge and car.

        :param race_id: The race id. This attribute is sufficient to uniquely identify a race.
        :param race_timestamp: The race timestamp as a datetime.
        :param track_name: Track name.
        :param challenge_name: Challenge name.
        :param car_name: Car name.
        :param ctx: An metrics store open context retrieved from another metrics store with ``#open_context``.
        :param create: True if an index should be created (if necessary). This is typically True, when attempting to write metrics and
        False when it is just opened for reading (as we can assume all necessary indices exist at this point).
        """
        if ctx:
            self._race_id = ctx["race-id"]
            self._race_timestamp = ctx["race-timestamp"]
            self._track = ctx["track"]
            self._challenge = ctx["challenge"]
            self._car = ctx["car"]
        else:
            self._race_id = race_id
            self._race_timestamp = time.to_iso8601(race_timestamp)
            self._track = track_name
            self._challenge = challenge_name
            self._car = car_name
        assert self._race_id is not None, "Attempting to open metrics store without a race id"
        assert self._race_timestamp is not None, "Attempting to open metrics store without a race timestamp"

        self._car_name = "+".join(self._car) if isinstance(self._car, list) else self._car

        self.logger.info(
            "Opening metrics store for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s]",
            self._race_timestamp,
            self._track,
            self._challenge,
            self._car,
        )

        user_tags = self._config.opts("race", "user.tags", default_value={}, mandatory=False)
        for k, v in user_tags.items():
            # prefix user tag with "tag_" in order to avoid clashes with our internal meta data
            self.add_meta_info(MetaInfoScope.cluster, None, "tag_%s" % k, v)
        # Don't store it for each metrics record as it's probably sufficient on race level
        # self.add_meta_info(MetaInfoScope.cluster, None, "rally_version", version.version())
        self._stop_watch.start()
        self.opened = True

    def reset_relative_time(self):
        """
        Resets the internal relative-time counter to zero.
        """
        self._stop_watch.start()

    def flush(self, refresh=True):
        """
        Explicitly flushes buffered metrics to the metric store. It is not required to flush before closing the metrics store.
        """
        raise NotImplementedError("abstract method")

    def close(self):
        """
        Closes the metric store. Note that it is mandatory to close the metrics store when it is no longer needed as it only persists
        metrics on close (in order to avoid additional latency during the benchmark).
        """
        self.logger.info("Closing metrics store.")
        self.opened = False
        self.flush()
        self._clear_meta_info()

    def add_meta_info(self, scope, scope_key, key, value):
        """
        Adds new meta information to the metrics store. All metrics entries that are created after calling this method are guaranteed to
        contain the added meta info (provided is on the same level or a level below, e.g. a cluster level metric will not contain node
        level meta information but all cluster level meta information will be contained in a node level metrics record).

        :param scope: The scope of the meta information. See MetaInfoScope.
        :param scope_key: The key within the scope. For cluster level metrics None is expected, for node level metrics the node name.
        :param key: The key of the meta information.
        :param value: The value of the meta information.
        """
        if scope == MetaInfoScope.cluster:
            self._meta_info[MetaInfoScope.cluster][key] = value
        elif scope == MetaInfoScope.node:
            if scope_key not in self._meta_info[MetaInfoScope.node]:
                self._meta_info[MetaInfoScope.node][scope_key] = {}
            self._meta_info[MetaInfoScope.node][scope_key][key] = value
        else:
            raise exceptions.SystemSetupError("Unknown meta info scope [%s]" % scope)

    def _clear_meta_info(self):
        """
        Clears all internally stored meta-info. This is considered Rally internal API and not intended for normal client consumption.
        """
        self._meta_info = {MetaInfoScope.cluster: {}, MetaInfoScope.node: {}}

    @property
    def open_context(self):
        return {
            "race-id": self._race_id,
            "race-timestamp": self._race_timestamp,
            "track": self._track,
            "challenge": self._challenge,
            "car": self._car,
        }

    def put_value_cluster_level(
        self,
        name,
        value,
        unit=None,
        task=None,
        operation=None,
        operation_type=None,
        sample_type=SampleType.Normal,
        absolute_time=None,
        relative_time=None,
        meta_data=None,
    ):
        """
        Adds a new cluster level value metric.

        :param name: The name of the metric.
        :param value: The metric value. It is expected to be numeric.
        :param unit: The unit of this metric value (e.g. ms, docs/s). Optional. Defaults to None.
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put_metric(
            MetaInfoScope.cluster,
            None,
            name,
            value,
            unit,
            task,
            operation,
            operation_type,
            sample_type,
            absolute_time,
            relative_time,
            meta_data,
        )

    def put_value_node_level(
        self,
        node_name,
        name,
        value,
        unit=None,
        task=None,
        operation=None,
        operation_type=None,
        sample_type=SampleType.Normal,
        absolute_time=None,
        relative_time=None,
        meta_data=None,
    ):
        """
        Adds a new node level value metric.

        :param name: The name of the metric.
        :param node_name: The name of the cluster node for which this metric has been determined.
        :param value: The metric value. It is expected to be numeric.
        :param unit: The unit of this metric value (e.g. ms, docs/s). Optional. Defaults to None.
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put_metric(
            MetaInfoScope.node,
            node_name,
            name,
            value,
            unit,
            task,
            operation,
            operation_type,
            sample_type,
            absolute_time,
            relative_time,
            meta_data,
        )

    def _put_metric(
        self,
        level,
        level_key,
        name,
        value,
        unit,
        task,
        operation,
        operation_type,
        sample_type,
        absolute_time=None,
        relative_time=None,
        meta_data=None,
    ):
        if level == MetaInfoScope.cluster:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
        elif level == MetaInfoScope.node:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
            if level_key in self._meta_info[MetaInfoScope.node]:
                meta.update(self._meta_info[MetaInfoScope.node][level_key])
        else:
            raise exceptions.SystemSetupError("Unknown meta info level [%s] for metric [%s]" % (level, name))
        if meta_data:
            meta.update(meta_data)

        if absolute_time is None:
            absolute_time = self._clock.now()
        if relative_time is None:
            relative_time = self._stop_watch.split_time()

        doc = {
            "@timestamp": time.to_epoch_millis(absolute_time),
            "relative-time": convert.seconds_to_ms(relative_time),
            "race-id": self._race_id,
            "race-timestamp": self._race_timestamp,
            "environment": self._environment_name,
            "track": self._track,
            "challenge": self._challenge,
            "car": self._car_name,
            "name": name,
            "value": value,
            "unit": unit,
            "sample-type": sample_type.name.lower(),
            "meta": meta,
        }
        if task:
            doc["task"] = task
        if operation:
            doc["operation"] = operation
        if operation_type:
            doc["operation-type"] = operation_type
        if self._track_params:
            doc["track-params"] = self._track_params
        self._add(doc)

    def put_doc(self, doc, level=None, node_name=None, meta_data=None, absolute_time=None, relative_time=None):
        """
        Adds a new document to the metrics store. It will merge additional properties into the doc such as timestamps or track info.

        :param doc: The raw document as a ``dict``. Ownership is transferred to the metrics store (i.e. don't reuse that object).
        :param level: Whether these are cluster or node-level metrics. May be ``None`` if not applicable.
        :param node_name: The name of the node in case metrics are on node level.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        """
        if level == MetaInfoScope.cluster:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
        elif level == MetaInfoScope.node:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
            if node_name in self._meta_info[MetaInfoScope.node]:
                meta.update(self._meta_info[MetaInfoScope.node][node_name])
        elif level is None:
            meta = None
        else:
            raise exceptions.SystemSetupError(f"Unknown meta info level [{level}]")

        if meta and meta_data:
            meta.update(meta_data)

        if absolute_time is None:
            absolute_time = self._clock.now()
        if relative_time is None:
            relative_time = self._stop_watch.split_time()

        doc.update(
            {
                "@timestamp": time.to_epoch_millis(absolute_time),
                "relative-time": convert.seconds_to_ms(relative_time),
                "race-id": self._race_id,
                "race-timestamp": self._race_timestamp,
                "environment": self._environment_name,
                "track": self._track,
                "challenge": self._challenge,
                "car": self._car_name,
            }
        )
        if meta:
            doc["meta"] = meta
        if self._track_params:
            doc["track-params"] = self._track_params

        self._add(doc)

    def bulk_add(self, memento):
        """
        Adds raw metrics store documents previously created with #to_externalizable()

        :param memento: The external representation as returned by #to_externalizable().
        """
        if memento:
            self.logger.debug("Restoring in-memory representation of metrics store.")
            for doc in pickle.loads(zlib.decompress(memento)):
                self._add(doc)

    def to_externalizable(self, clear=False):
        raise NotImplementedError("abstract method")

    def _add(self, doc):
        """
        Adds a new document to the metrics store

        :param doc: The new document.
        """
        raise NotImplementedError("abstract method")

    def get_one(
        self, name, sample_type=None, node_name=None, task=None, mapper=lambda doc: doc["value"], sort_key=None, sort_reverse=False
    ):
        """
        Gets one value for the given metric name (even if there should be more than one).

        :param name: The metric name to query.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param node_name The name of the node where this metric was gathered. Optional.
        :param task The task name to query. Optional.
        :param sort_key The key to sort the docs before returning the first value. Optional.
        :param sort_reverse  The flag to reverse the sort. Optional.
        :return: The corresponding value for the given metric name or None if there is no value.
        """
        raise NotImplementedError("abstract method")

    @staticmethod
    def _first_or_none(values):
        return values[0] if values else None

    def get(self, name, task=None, operation_type=None, sample_type=None, node_name=None):
        """
        Gets all raw values for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param node_name The name of the node where this metric was gathered. Optional.
        :return: A list of all values for the given metric.
        """
        return self._get(name, task, operation_type, sample_type, node_name, lambda doc: doc["value"])

    def get_raw(self, name, task=None, operation_type=None, sample_type=None, node_name=None, mapper=lambda doc: doc):
        """
        Gets all raw records for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param node_name The name of the node where this metric was gathered. Optional.
        :param mapper A record mapper. By default, the complete record is returned.
        :return: A list of all raw records for the given metric.
        """
        return self._get(name, task, operation_type, sample_type, node_name, mapper)

    def get_unit(self, name, task=None, operation_type=None, node_name=None):
        """
        Gets the unit for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param node_name The name of the node where this metric was gathered. Optional.
        :return: The corresponding unit for the given metric name or None if no metric record is available.
        """
        # does not make too much sense to ask for a sample type here
        return self._first_or_none(self._get(name, task, operation_type, None, node_name, lambda doc: doc["unit"]))

    def _get(self, name, task, operation_type, sample_type, node_name, mapper):
        raise NotImplementedError("abstract method")

    def get_error_rate(self, task, operation_type=None, sample_type=None):
        """
        Gets the error rate for a specific task.

        :param task The task name to query.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :return: A float between 0.0 and 1.0 (inclusive) representing the error rate.
        """
        raise NotImplementedError("abstract method")

    def get_stats(self, name, task=None, operation_type=None, sample_type=None):
        """
        Gets standard statistics for the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :return: A metric_stats structure.
        """
        raise NotImplementedError("abstract method")

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, percentiles=None):
        """
        Retrieves percentile metrics for the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param percentiles: An optional list of percentiles to show. If None is provided, by default the 99th, 99.9th and 100th percentile
        are determined. Ensure that there are enough data points in the metrics store (e.g. it makes no sense to retrieve a 99.9999
        percentile when there are only 10 values).
        :return: An ordered dictionary of the determined percentile values in ascending order. Key is the percentile, value is the
        determined value at this percentile. If no percentiles could be determined None is returned.
        """
        raise NotImplementedError("abstract method")

    def get_median(self, name, task=None, operation_type=None, sample_type=None):
        """
        Retrieves median value of the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :return: The median value.
        """
        median = "50.0"
        percentiles = self.get_percentiles(name, task, operation_type, sample_type, percentiles=[median])
        return percentiles[median] if percentiles else None

    def get_mean(self, name, task=None, operation_type=None, sample_type=None):
        """
        Retrieves mean of the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :return: The mean.
        """
        stats = self.get_stats(name, task, operation_type, sample_type)
        return stats["avg"] if stats else None


class EsMetricsStore(MetricsStore):
    """
    A metrics store backed by Elasticsearch.
    """

    def __init__(
        self,
        cfg: types.Config,
        client_factory_class=EsClientFactory,
        index_template_provider_class=IndexTemplateProvider,
        clock=time.Clock,
        meta_info=None,
    ):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        """
        MetricsStore.__init__(self, cfg=cfg, clock=clock, meta_info=meta_info)
        self._index = None
        self._client = client_factory_class(cfg).create()
        self._index_template_provider = index_template_provider_class(cfg)
        self._docs = None

    def open(self, race_id=None, race_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
        self._docs = []
        MetricsStore.open(self, race_id, race_timestamp, track_name, challenge_name, car_name, ctx, create)
        self._index = self.index_name()
        # reduce a bit of noise in the metrics cluster log
        if create:
            self._ensure_index_template()
            if not self._client.exists(index=self._index):
                self._client.create_index(index=self._index)
            else:
                self.logger.info("[%s] already exists.", self._index)
        else:
            # we still need to check for the correct index name - prefer the one with the suffix
            new_name = self._migrated_index_name(self._index)
            if self._client.exists(index=new_name):
                self._index = new_name

        # ensure we can search immediately after opening
        self._client.refresh(index=self._index)

    def _ensure_index_template(self):
        new_template: str = self._get_template()

        old_template: dict | None = None
        if self._client.template_exists("rally-metrics"):
            for t in self._client.get_template("rally-metrics").body.get("index_templates", []):
                old_template = t.get("index_template", {}).get("template", {})
                break

        if old_template is None:
            self.logger.info(
                "Create index template:\n%s",
                pretty.dump(json.loads(new_template).get("template", {}), pretty.Flag.FLAT_DICT),
            )
        else:
            diff = pretty.diff(old_template, json.loads(new_template).get("template", {}), pretty.Flag.FLAT_DICT)
            if diff == "":
                self.logger.debug("Keep existing template (it is identical)")
                return
            if not convert.to_bool(
                self._config.opts(section="reporting", key="datastore.overwrite_existing_templates", default_value=False, mandatory=False)
            ):
                self.logger.debug("Keep existing template (datastore.overwrite_existing_templates = false):\n%s", diff)
                return
            self.logger.warning("Overwrite existing index template (datastore.overwrite_existing_templates = true):\n%s", diff)

        self._client.put_template("rally-metrics", new_template)

    def index_name(self):
        ts = time.from_iso8601(self._race_timestamp)
        return "rally-metrics-%04d-%02d" % (ts.year, ts.month)

    def _migrated_index_name(self, original_name):
        return f"{original_name}.new"

    def _get_template(self):
        return self._index_template_provider.metrics_template()

    def flush(self, refresh=True):
        if self._docs:
            sw = time.StopWatch()
            sw.start()
            self._client.bulk_index(index=self._index, items=self._docs)
            sw.stop()
            self.logger.info(
                "Successfully added %d metrics documents for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s] in [%f] seconds.",
                len(self._docs),
                self._race_timestamp,
                self._track,
                self._challenge,
                self._car,
                sw.total_time(),
            )
        self._docs = []
        # ensure we can search immediately after flushing
        if refresh:
            self._client.refresh(index=self._index)

    def _add(self, doc):
        self._docs.append(doc)

    def _get(self, name, task, operation_type, sample_type, node_name, mapper):
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, node_name),
            "track_total_hits": True,
            "size": 10000,
        }
        self.logger.debug("Issuing get against index=[%s], query=[%s].", self._index, query)
        result = self._client.search(index=self._index, body=query)
        es_count = result["hits"]["total"]["value"]
        self.logger.debug("Metrics query found [%s] results.", es_count)
        if es_count != len(result["hits"]["hits"]):
            self.logger.warning("Metrics query returned [%d] out of [%s] matching docs.", len(result["hits"]["hits"]), es_count)
        return [mapper(v["_source"]) for v in result["hits"]["hits"]]

    def get_one(
        self, name, sample_type=None, node_name=None, task=None, mapper=lambda doc: doc["value"], sort_key=None, sort_reverse=False
    ):
        order = "desc" if sort_reverse else "asc"
        query = {
            "query": self._query_by_name(name, task, None, sample_type, node_name),
            "size": 1,
        }
        if sort_key:
            query["sort"] = [{sort_key: {"order": order}}]
        self.logger.debug("Issuing get against index=[%s], query=[%s].", self._index, query)
        result = self._client.search(index=self._index, body=query)
        hits = result["hits"]["total"]
        # Elasticsearch 7.0+
        if isinstance(hits, dict):
            hits = hits["value"]
        self.logger.debug("Metrics query produced [%s] results.", hits)
        if hits > 0:
            return mapper(result["hits"]["hits"][0]["_source"])
        else:
            return None

    def get_error_rate(self, task, operation_type=None, sample_type=None):
        query = {
            "query": self._query_by_name("service_time", task, operation_type, sample_type, None),
            "size": 0,
            "aggs": {
                "error_rate": {
                    "terms": {
                        "field": "meta.success",
                    },
                },
            },
        }
        self.logger.debug("Issuing get_error_rate against index=[%s], query=[%s]", self._index, query)
        result = self._client.search(index=self._index, body=query)
        buckets = result["aggregations"]["error_rate"]["buckets"]
        self.logger.debug("Query returned [%d] buckets.", len(buckets))
        count_success = 0
        count_errors = 0
        for bucket in buckets:
            k = bucket["key_as_string"]
            doc_count = int(bucket["doc_count"])
            self.logger.debug("Processing key [%s] with [%d] docs.", k, doc_count)
            if k == "true":
                count_success = doc_count
            elif k == "false":
                count_errors = doc_count
            else:
                self.logger.warning("Unrecognized bucket key [%s] with [%d] docs.", k, doc_count)

        if count_errors == 0:
            return 0.0
        elif count_success == 0:
            return 1.0
        else:
            return count_errors / (count_errors + count_success)

    def get_stats(self, name, task=None, operation_type=None, sample_type=None):
        """
        Gets standard statistics for the given metric name.

        :return: A metric_stats structure. For details please refer to
        https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
        """
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, None),
            "size": 0,
            "aggs": {
                "metric_stats": {
                    "stats": {
                        "field": "value",
                    },
                },
            },
        }
        self.logger.debug("Issuing get_stats against index=[%s], query=[%s]", self._index, query)
        result = self._client.search(index=self._index, body=query)
        return result["aggregations"]["metric_stats"]

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, percentiles=None):
        if percentiles is None:
            percentiles = [99, 99.9, 100]
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, None),
            "size": 0,
            "aggs": {
                "percentile_stats": {
                    "percentiles": {
                        "field": "value",
                        "percents": percentiles,
                    },
                },
            },
        }
        self.logger.debug("Issuing get_percentiles against index=[%s], query=[%s]", self._index, query)
        result = self._client.search(index=self._index, body=query)
        hits = result["hits"]["total"]
        # Elasticsearch 7.0+
        if isinstance(hits, dict):
            hits = hits["value"]
        self.logger.debug("get_percentiles produced %d hits", hits)
        if hits > 0:
            raw = result["aggregations"]["percentile_stats"]["values"]
            return collections.OrderedDict(sorted(raw.items(), key=lambda t: float(t[0])))
        else:
            return None

    def _query_by_name(self, name, task, operation_type, sample_type, node_name):
        q = {
            "bool": {
                "filter": [
                    {
                        "term": {
                            "race-id": self._race_id,
                        },
                    },
                    {
                        "term": {
                            "name": name,
                        },
                    },
                ],
            },
        }
        if task:
            q["bool"]["filter"].append(
                {
                    "term": {
                        "task": task,
                    },
                },
            )
        if operation_type:
            q["bool"]["filter"].append(
                {
                    "term": {
                        "operation-type": operation_type,
                    },
                },
            )
        if sample_type:
            q["bool"]["filter"].append(
                {
                    "term": {
                        "sample-type": sample_type.name.lower(),
                    },
                },
            )
        if node_name:
            q["bool"]["filter"].append(
                {
                    "term": {
                        "meta.node_name": node_name,
                    },
                },
            )
        return q

    def to_externalizable(self, clear=False):
        # no need for an externalizable representation - stores everything directly
        return None

    def __str__(self):
        return "Elasticsearch metrics store"


class InMemoryMetricsStore(MetricsStore):
    def __init__(self, cfg: types.Config, clock=time.Clock, meta_info=None):
        """

        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        """
        super().__init__(cfg=cfg, clock=clock, meta_info=meta_info)
        self.docs = []

    def __del__(self):
        """
        Deletes the metrics store instance.
        """
        del self.docs

    def _add(self, doc):
        self.docs.append(doc)

    def flush(self, refresh=True):
        pass

    def to_externalizable(self, clear=False):
        docs = self.docs
        if clear:
            self.docs = []
        compressed = zlib.compress(pickle.dumps(docs))
        self.logger.debug(
            "Compression changed size of metric store from [%d] bytes to [%d] bytes", sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1)
        )
        return compressed

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, percentiles=None):
        if percentiles is None:
            percentiles = [99, 99.9, 100]
        result = collections.OrderedDict()
        values = self.get(name, task, operation_type, sample_type)
        if len(values) > 0:
            sorted_values = sorted(values)
            for percentile in percentiles:
                result[percentile] = self.percentile_value(sorted_values, percentile)
        return result

    @staticmethod
    def percentile_value(sorted_values, percentile):
        """
        Calculates a percentile value for a given list of values and a percentile.

        The implementation is based on http://onlinestatbook.com/2/introduction/percentiles.html

        :param sorted_values: A sorted list of raw values for which a percentile should be calculated.
        :param percentile: A percentile between [0, 100]
        :return: the corresponding percentile value.
        """
        rank = float(percentile) / 100.0 * (len(sorted_values) - 1)
        if rank == int(rank):
            return sorted_values[int(rank)]
        else:
            lr = math.floor(rank)
            lr_next = math.ceil(rank)
            fr = rank - lr
            lower_score = sorted_values[lr]
            higher_score = sorted_values[lr_next]
            return lower_score + (higher_score - lower_score) * fr

    def get_error_rate(self, task, operation_type=None, sample_type=None):
        error = 0
        total_count = 0
        for doc in self.docs:
            # we can use any request metrics record (i.e. service time or latency)
            if (
                doc["name"] == "service_time"
                and doc["task"] == task
                and (operation_type is None or doc["operation-type"] == operation_type)
                and (sample_type is None or doc["sample-type"] == sample_type.name.lower())
            ):
                total_count += 1
                if doc["meta"]["success"] is False:
                    error += 1
        if total_count > 0:
            return error / total_count
        else:
            return 0.0

    def get_stats(self, name, task=None, operation_type=None, sample_type=SampleType.Normal):
        values = self.get(name, task, operation_type, sample_type)
        sorted_values = sorted(values)
        if len(sorted_values) > 0:
            return {
                "count": len(sorted_values),
                "min": sorted_values[0],
                "max": sorted_values[-1],
                "avg": statistics.mean(sorted_values),
                "sum": sum(sorted_values),
            }
        else:
            return None

    def _get(self, name, task, operation_type, sample_type, node_name, mapper):
        return [
            mapper(doc)
            for doc in self.docs
            if doc["name"] == name
            and (task is None or doc["task"] == task)
            and (operation_type is None or doc["operation-type"] == operation_type)
            and (sample_type is None or doc["sample-type"] == sample_type.name.lower())
            and (node_name is None or doc.get("meta", {}).get("node_name") == node_name)
        ]

    def get_one(
        self, name, sample_type=None, node_name=None, task=None, mapper=lambda doc: doc["value"], sort_key=None, sort_reverse=False
    ):
        if sort_key:
            docs = sorted(self.docs, key=lambda k: k[sort_key], reverse=sort_reverse)
        else:
            docs = self.docs
        for doc in docs:
            if (
                doc["name"] == name
                and (task is None or doc["task"] == task)
                and (sample_type is None or doc["sample-type"] == sample_type.name.lower())
                and (node_name is None or doc.get("meta", {}).get("node_name") == node_name)
            ):
                return mapper(doc)
        return None

    def __str__(self):
        return "in-memory metrics store"


def race_store(cfg: types.Config):
    """
    Creates a proper race store based on the current configuration.
    :param cfg: Config object. Mandatory.
    :return: A race store implementation.
    """
    logger = logging.getLogger(__name__)
    if cfg.opts("reporting", "datastore.type") == "elasticsearch":
        logger.info("Creating ES race store")
        return CompositeRaceStore(EsRaceStore(cfg), FileRaceStore(cfg))
    else:
        logger.info("Creating file race store")
        return FileRaceStore(cfg)


def results_store(cfg: types.Config):
    """
    Creates a proper race store based on the current configuration.
    :param cfg: Config object. Mandatory.
    :return: A race store implementation.
    """
    logger = logging.getLogger(__name__)
    if cfg.opts("reporting", "datastore.type") == "elasticsearch":
        logger.info("Creating ES results store")
        return EsResultsStore(cfg)
    else:
        logger.info("Creating no-op results store")
        return NoopResultsStore()


def delete_race(cfg: types.Config):
    race_store(cfg).delete_race()


def delete_annotation(cfg: types.Config):
    race_store(cfg).delete_annotation()


def list_annotations(cfg: types.Config):
    race_store(cfg).list_annotations()


def add_annotation(cfg: types.Config):
    race_store(cfg).add_annotation()


def list_races(cfg: types.Config):
    def format_dict(d):
        if d:
            items = sorted(d.items())
            return ", ".join(["%s=%s" % (k, v) for k, v in items])
        else:
            return None

    races = []
    for race in race_store(cfg).list():
        races.append(
            [
                race.race_id,
                time.to_iso8601(race.race_timestamp),
                race.track,
                race.challenge_name,
                race.car_name,
                race.distribution_version,
                race.revision,
                race.rally_version,
                race.track_revision,
                race.team_revision,
                format_dict(race.user_tags),
            ]
        )

    if len(races) > 0:
        console.println("\nRecent races:\n")
        console.println(
            tabulate.tabulate(
                races,
                headers=[
                    "Race ID",
                    "Race Timestamp",
                    "Track",
                    "Challenge",
                    "Car",
                    "ES Version",
                    "Revision",
                    "Rally Version",
                    "Track Revision",
                    "Team Revision",
                    "User Tags",
                ],
            )
        )
    else:
        console.println("")
        console.println("No recent races found.")


def create_race(cfg: types.Config, track, challenge, track_revision=None):
    car = cfg.opts("mechanic", "car.names")
    environment = cfg.opts("system", "env.name")
    race_id = cfg.opts("system", "race.id")
    race_timestamp = cfg.opts("system", "time.start")
    user_tags = cfg.opts("race", "user.tags", default_value={}, mandatory=False)
    pipeline = cfg.opts("race", "pipeline")
    track_params = cfg.opts("track", "params")
    car_params = cfg.opts("mechanic", "car.params")
    plugin_params = cfg.opts("mechanic", "plugin.params")
    rally_version = version.version()
    rally_revision = version.revision()

    return Race(
        rally_version,
        rally_revision,
        environment,
        race_id,
        race_timestamp,
        pipeline,
        user_tags,
        track,
        track_params,
        challenge,
        car,
        car_params,
        plugin_params,
        track_revision,
    )


class Race:
    def __init__(
        self,
        rally_version,
        rally_revision,
        environment_name,
        race_id,
        race_timestamp,
        pipeline,
        user_tags,
        track,
        track_params,
        challenge,
        car,
        car_params,
        plugin_params,
        track_revision=None,
        team_revision=None,
        distribution_version=None,
        distribution_flavor=None,
        revision=None,
        results=None,
        meta_data=None,
    ):
        if results is None:
            results = {}
        # this happens when the race is created initially
        if meta_data is None:
            meta_data = {}
            if track:
                meta_data.update(track.meta_data)
            if challenge:
                meta_data.update(challenge.meta_data)
        self.rally_version = rally_version
        self.rally_revision = rally_revision
        self.environment_name = environment_name
        self.race_id = race_id
        self.race_timestamp = race_timestamp
        self.pipeline = pipeline
        self.user_tags = user_tags
        self.track = track
        self.track_params = track_params
        self.challenge = challenge
        self.car = car
        self.car_params = car_params
        self.plugin_params = plugin_params
        self.track_revision = track_revision
        self.team_revision = team_revision
        self.distribution_version = distribution_version
        self.distribution_flavor = distribution_flavor
        self.revision = revision
        self.results = results
        self.meta_data = meta_data

    @property
    def track_name(self):
        return str(self.track)

    @property
    def challenge_name(self):
        return str(self.challenge) if self.challenge else None

    @property
    def car_name(self):
        return "+".join(self.car) if isinstance(self.car, list) else self.car

    def add_results(self, results):
        self.results = results

    def as_dict(self):
        """
        :return: A dict representation suitable for persisting this race instance as JSON.
        """
        d = {
            "rally-version": self.rally_version,
            "rally-revision": self.rally_revision,
            "environment": self.environment_name,
            "race-id": self.race_id,
            "race-timestamp": time.to_iso8601(self.race_timestamp),
            "pipeline": self.pipeline,
            "user-tags": self.user_tags,
            "track": self.track_name,
            "car": self.car,
            "cluster": {
                "revision": self.revision,
                "distribution-version": self.distribution_version,
                "distribution-flavor": self.distribution_flavor,
                "team-revision": self.team_revision,
            },
        }
        if self.results:
            d["results"] = self.results.as_dict()
        if self.track_revision:
            d["track-revision"] = self.track_revision
        if not self.challenge.auto_generated:
            d["challenge"] = self.challenge_name
        if self.track_params:
            d["track-params"] = self.track_params
        if self.car_params:
            d["car-params"] = self.car_params
        if self.plugin_params:
            d["plugin-params"] = self.plugin_params
        return d

    def to_result_dicts(self):
        """
        :return: a list of dicts, suitable for persisting the results of this race in a format that is Kibana-friendly.
        """
        result_template = {
            "rally-version": self.rally_version,
            "rally-revision": self.rally_revision,
            "environment": self.environment_name,
            "race-id": self.race_id,
            "race-timestamp": time.to_iso8601(self.race_timestamp),
            "distribution-version": self.distribution_version,
            "distribution-flavor": self.distribution_flavor,
            "user-tags": self.user_tags,
            "track": self.track_name,
            "challenge": self.challenge_name,
            "car": self.car_name,
            # allow to logically delete records, e.g. for UI purposes when we only want to show the latest result
            "active": True,
        }
        if versions.is_version_identifier(self.distribution_version):
            result_template["distribution-major-version"] = versions.major_version(self.distribution_version)
        if self.team_revision:
            result_template["team-revision"] = self.team_revision
        if self.track_revision:
            result_template["track-revision"] = self.track_revision
        if self.track_params:
            result_template["track-params"] = self.track_params
        if self.car_params:
            result_template["car-params"] = self.car_params
        if self.plugin_params:
            result_template["plugin-params"] = self.plugin_params
        if self.meta_data:
            result_template["meta"] = self.meta_data

        all_results = []

        for item in self.results.as_flat_list():
            result = result_template.copy()
            result.update(item)
            all_results.append(result)

        return all_results

    @classmethod
    def from_dict(cls, d):
        user_tags = d.get("user-tags", {})
        # TODO: cluster is optional for BWC. This can be removed after some grace period.
        cluster = d.get("cluster", {})
        return Race(
            d["rally-version"],
            d.get("rally-revision"),
            d["environment"],
            d["race-id"],
            time.from_iso8601(d["race-timestamp"]),
            d["pipeline"],
            user_tags,
            d["track"],
            d.get("track-params"),
            d.get("challenge"),
            d["car"],
            d.get("car-params"),
            d.get("plugin-params"),
            track_revision=d.get("track-revision"),
            team_revision=cluster.get("team-revision"),
            distribution_version=cluster.get("distribution-version"),
            distribution_flavor=cluster.get("distribution-flavor"),
            revision=cluster.get("revision"),
            results=d.get("results"),
            meta_data=d.get("meta", {}),
        )


class RaceStore:
    def __init__(self, cfg: types.Config):
        self.cfg = cfg
        self.environment_name = cfg.opts("system", "env.name")

    def find_by_race_id(self, race_id):
        raise NotImplementedError("abstract method")

    def list(self):
        raise NotImplementedError("abstract method")

    def delete_race(self):
        raise NotImplementedError("abstract method")

    def delete_annotation(self):
        raise NotImplementedError("abstract method")

    def list_annotations(self):
        raise NotImplementedError("abstract method")

    def add_annotation(self):
        raise NotImplementedError("abstract method")

    def store_race(self, race):
        raise NotImplementedError("abstract method")

    def _max_results(self):
        return int(self.cfg.opts("system", "list.max_results"))

    def _track(self):
        return self.cfg.opts("system", "admin.track", mandatory=False)

    def _benchmark_name(self):
        return self.cfg.opts("system", "list.races.benchmark_name", mandatory=False)

    def _race_timestamp(self):
        return self.cfg.opts("system", "add.race_timestamp")

    def _message(self):
        return self.cfg.opts("system", "add.message")

    def _chart_type(self):
        return self.cfg.opts("system", "add.chart_type", mandatory=False)

    def _chart_name(self):
        return self.cfg.opts("system", "add.chart_name", mandatory=False)

    def _from_date(self):
        return self.cfg.opts("system", "list.from_date", mandatory=False)

    def _to_date(self):
        return self.cfg.opts("system", "list.to_date", mandatory=False)

    def _dry_run(self):
        return self.cfg.opts("system", "admin.dry_run", mandatory=False)

    def _id(self):
        return self.cfg.opts("system", "delete.id")

    def _challenge(self):
        return self.cfg.opts("system", "list.challenge", mandatory=False)


# Does not inherit from RaceStore as it is only a delegator with the same API.
class CompositeRaceStore:
    """
    Internal helper class to store races as file and to Elasticsearch in case users want Elasticsearch as a race store.

    It provides the same API as RaceStore. It delegates writes to all stores and all read operations only the Elasticsearch race store.
    """

    def __init__(self, es_store, file_store):
        self.es_store = es_store
        self.file_store = file_store

    def find_by_race_id(self, race_id):
        return self.es_store.find_by_race_id(race_id)

    def store_race(self, race):
        self.file_store.store_race(race)
        self.es_store.store_race(race)

    def delete_race(self):
        return self.es_store.delete_race()

    def delete_annotation(self):
        return self.es_store.delete_annotation()

    def list_annotations(self):
        return self.es_store.list_annotations()

    def add_annotation(self):
        return self.es_store.add_annotation()

    def list(self):
        return self.es_store.list()


class FileRaceStore(RaceStore):
    def store_race(self, race):
        doc = race.as_dict()
        race_path = paths.race_root(self.cfg, race_id=race.race_id)
        io.ensure_dir(race_path)
        with open(self._race_file(), mode="w", encoding="utf-8") as f:
            f.write(json.dumps(doc, indent=True, ensure_ascii=False))

    def _race_file(self, race_id=None):
        return os.path.join(paths.race_root(cfg=self.cfg, race_id=race_id), "race.json")

    def delete_race(self):
        raise NotImplementedError("Not supported for in-memory datastore.")

    def delete_annotation(self):
        raise NotImplementedError("Not supported for in-memory datastore.")

    def list_annotations(self):
        raise NotImplementedError("Not supported for in-memory datastore.")

    def add_annotation(self):
        raise NotImplementedError("Not supported for in-memory datastore.")

    def list(self):
        results = glob.glob(self._race_file(race_id="*"))
        all_races = self._to_races(results)
        return all_races[: self._max_results()]

    def find_by_race_id(self, race_id):
        race_file = self._race_file(race_id=race_id)
        if io.exists(race_file):
            races = self._to_races([race_file])
            if races:
                return races[0]
        raise exceptions.NotFound(f"No race with race id [{race_id}]")

    def _to_races(self, results):
        races = []
        track = self._track()
        name = self._benchmark_name()
        pattern = "%Y%m%d"
        from_date = self._from_date()
        to_date = self._to_date()
        for result in results:
            # noinspection PyBroadException
            try:
                with open(result, encoding="utf-8") as f:
                    races.append(Race.from_dict(json.loads(f.read())))
            except BaseException:
                logging.getLogger(__name__).exception("Could not load race file [%s] (incompatible format?) Skipping...", result)

        if track:
            races = filter(lambda r: r.track == track, races)
        if name:
            filtered_on_name = filter(lambda r: r.user_tags.get("name") == name, races)
            filtered_on_benchmark_name = filter(lambda r: r.user_tags.get("benchmark-name") == name, races)
            races = list(filtered_on_name) + list(filtered_on_benchmark_name)
        if from_date:
            races = filter(lambda r: r.race_timestamp.date() >= datetime.datetime.strptime(from_date, pattern).date(), races)
        if to_date:
            races = filter(lambda r: r.race_timestamp.date() <= datetime.datetime.strptime(to_date, pattern).date(), races)

        return sorted(races, key=lambda r: r.race_timestamp, reverse=True)


class EsRaceStore(RaceStore):
    INDEX_PREFIX = "rally-races-"

    def __init__(self, cfg: types.Config, client_factory_class=EsClientFactory, index_template_provider_class=IndexTemplateProvider):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        """
        super().__init__(cfg)
        self.client = client_factory_class(cfg).create()
        self.index_template_provider = index_template_provider_class(cfg)

    def store_race(self, race):
        doc = race.as_dict()
        # always update the mapping to the latest version
        self.client.put_template("rally-races", self.index_template_provider.races_template())
        self.client.index(index=self.index_name(race), item=doc, id=race.race_id)

    def index_name(self, race):
        race_timestamp = race.race_timestamp
        return f"{EsRaceStore.INDEX_PREFIX}{race_timestamp:%Y-%m}"

    def add_annotation(self):
        def _at_midnight(race_timestamp):
            TIMESTAMP_FMT = "%Y%m%dT%H%M%SZ"
            date = datetime.datetime.strptime(race_timestamp, TIMESTAMP_FMT)
            date = date.replace(hour=0, minute=0, second=0, tzinfo=datetime.timezone.utc)
            return date.strftime(TIMESTAMP_FMT)

        environment = self.environment_name
        # To line up annotations with chart data points, use midnight of day N as this is
        # what the chart use too.
        race_timestamp = _at_midnight(self._race_timestamp())
        track = self._track()
        chart_type = self._chart_type()
        chart_name = self._chart_name()
        message = self._message()
        annotation_id = str(uuid.uuid4())
        dry_run = self._dry_run()

        if dry_run:
            console.println(
                f"Would add annotation with message [{message}] for environment=[{environment}], race timestamp=[{race_timestamp}], "
                f"track=[{track}], chart type=[{chart_type}], chart name=[{chart_name}]"
            )
        else:
            if not self.client.exists(index="rally-annotations"):
                # create or overwrite template on index creation
                self.client.put_template("rally-annotations", self.index_template_provider.annotations_template())
                self.client.create_index(index="rally-annotations")
            self.client.index(
                index="rally-annotations",
                id=annotation_id,
                item={
                    "environment": environment,
                    "race-timestamp": race_timestamp,
                    "track": track,
                    "chart": chart_type,
                    "chart-name": chart_name,
                    "message": message,
                },
            )
            console.println(f"Successfully added annotation [{annotation_id}].")

    def list_annotations(self):
        environment = self.environment_name
        track = self._track()
        from_date = self._from_date()
        to_date = self._to_date()
        query = {
            "query": {
                "bool": {
                    "filter": [
                        {"term": {"environment": environment}},
                        {"range": {"race-timestamp": {"gte": from_date, "lte": to_date, "format": "basic_date"}}},
                    ]
                }
            }
        }
        if track:
            query["query"]["bool"]["filter"].append({"term": {"track": track}})

        query["sort"] = [{"race-timestamp": "desc"}, {"track": "asc"}, {"chart": "asc"}]
        query["size"] = self._max_results()

        result = self.client.search(index="rally-annotations", body=query)
        annotations = []
        hits = result["hits"]["total"]
        if hits == 0:
            console.println(f"No annotations found in environment [{environment}].")
        else:
            for hit in result["hits"]["hits"]:
                src = hit["_source"]
                annotations.append(
                    [
                        hit["_id"],
                        src["race-timestamp"],
                        src.get("track", ""),
                        src.get("chart", ""),
                        src.get("chart-name", ""),
                        src["message"],
                    ]
                )

            if annotations:
                console.println("\nAnnotations:\n")
                console.println(
                    tabulate.tabulate(
                        annotations,
                        headers=["Annotation Id", "Timestamp", "Track", "Chart Type", "Chart Name", "Message"],
                    )
                )

    def delete_annotation(self):
        annotations = self._id().split(",")
        environment = self.environment_name
        if self._dry_run():
            if len(annotations) == 1:
                console.println(f"Would delete annotation with id [{annotations[0]}] in environment [{environment}].")
            else:
                console.println(f"Would delete {len(annotations)} annotations: {annotations} in environment [{environment}].")
        else:
            for annotation_id in annotations:
                result = self.client.delete(index="rally-annotations", id=annotation_id)
                if result["result"] == "deleted":
                    console.println(f"Successfully deleted [{annotation_id}].")
                else:
                    console.println(f"Did not find [{annotation_id}] in environment [{environment}].")

    def delete_race(self):
        races = self._id().split(",")
        environment = self.environment_name
        if self._dry_run():
            if len(races) == 1:
                console.println(f"Would delete race with id {races[0]} in environment {environment}.")
            else:
                console.println(f"Would delete {len(races)} races: {races} in environment {environment}.")
        else:
            for race_id in races:
                selector = {"query": {"bool": {"filter": [{"term": {"environment": environment}}, {"term": {"race-id": race_id}}]}}}
                self.client.delete_by_query(index="rally-races-*", body=selector)
                self.client.delete_by_query(index="rally-metrics-*", body=selector)
                result = self.client.delete_by_query(index="rally-results-*", body=selector)
                if result["deleted"] > 0:
                    console.println(f"Successfully deleted [{race_id}] in environment [{environment}].")
                else:
                    console.println(f"Did not find [{race_id}] in environment [{environment}].")

    def list(self):
        track = self._track()
        name = self._benchmark_name()
        from_date = self._from_date()
        to_date = self._to_date()
        challenge = self._challenge()

        filters = [
            {
                "term": {
                    "environment": self.environment_name,
                },
            },
            {"range": {"race-timestamp": {"gte": from_date, "lte": to_date, "format": "basic_date"}}},
        ]

        query = {
            "query": {
                "bool": {
                    "filter": filters,
                },
            },
            "size": self._max_results(),
            "sort": [
                {
                    "race-timestamp": {
                        "order": "desc",
                    },
                },
            ],
        }
        if track:
            query["query"]["bool"]["filter"].append({"term": {"track": track}})
        if name:
            query["query"]["bool"]["filter"].append(
                {"bool": {"should": [{"term": {"user-tags.benchmark-name": name}}, {"term": {"user-tags.name": name}}]}}
            )
        if challenge:
            query["query"]["bool"]["filter"].append({"bool": {"should": [{"term": {"challenge": challenge}}]}})

        result = self.client.search(index="%s*" % EsRaceStore.INDEX_PREFIX, body=query)
        hits = result["hits"]["total"]
        # Elasticsearch 7.0+
        if isinstance(hits, dict):
            hits = hits["value"]
        if hits > 0:
            return [Race.from_dict(v["_source"]) for v in result["hits"]["hits"]]
        else:
            return []

    def find_by_race_id(self, race_id):
        query = {
            "query": {
                "bool": {
                    "filter": [
                        {
                            "term": {
                                "race-id": race_id,
                            },
                        },
                    ],
                },
            },
        }
        result = self.client.search(index="%s*" % EsRaceStore.INDEX_PREFIX, body=query)
        hits = result["hits"]["total"]
        # Elasticsearch 7.0+
        if isinstance(hits, dict):
            hits = hits["value"]
        if hits == 1:
            return Race.from_dict(result["hits"]["hits"][0]["_source"])
        elif hits > 1:
            raise exceptions.RallyAssertionError(f"Expected exactly one race to match race id [{race_id}] but there were [{hits}] matches.")
        else:
            raise exceptions.NotFound(f"No race with race id [{race_id}]")


class EsResultsStore:
    """
    Stores the results of a race in a format that is better suited for reporting with Kibana.
    """

    INDEX_PREFIX = "rally-results-"

    def __init__(self, cfg: types.Config, client_factory_class=EsClientFactory, index_template_provider_class=IndexTemplateProvider):
        """
        Creates a new results store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        """
        self.cfg = cfg
        self.client = client_factory_class(cfg).create()
        self.index_template_provider = index_template_provider_class(cfg)

    def store_results(self, race):
        # always update the mapping to the latest version
        self.client.put_template("rally-results", self.index_template_provider.results_template())
        self.client.bulk_index(index=self.index_name(race), items=race.to_result_dicts())

    def index_name(self, race):
        race_timestamp = race.race_timestamp
        return f"{EsResultsStore.INDEX_PREFIX}{race_timestamp:%Y-%m}"


class NoopResultsStore:
    """
    Does not store any results separately as these are stored as part of the race on the file system.
    """

    def store_results(self, race):
        pass


# helper function for encoding and decoding float keys so that the Elasticsearch metrics store can save them.
def encode_float_key(k):
    # ensure that the key is indeed a float to unify the representation (e.g. 50 should be represented as "50_0")
    return str(float(k)).replace(".", "_")


def percentiles_for_sample_size(sample_size):
    # if needed we can come up with something smarter but it'll do for now
    if sample_size < 1:
        raise AssertionError("Percentiles require at least one sample")

    if sample_size == 1:
        return [100]
    elif 1 < sample_size < 10:
        return [50, 100]
    elif 10 <= sample_size < 100:
        return [50, 90, 100]
    elif 100 <= sample_size < 1000:
        return [50, 90, 99, 100]
    elif 1000 <= sample_size < 10000:
        return [50, 90, 99, 99.9, 100]
    else:
        return [50, 90, 99, 99.9, 99.99, 100]


class GlobalStatsCalculator:
    def __init__(self, store, track, challenge):
        self.store = store
        self.logger = logging.getLogger(__name__)
        self.track = track
        self.challenge = challenge

    def __call__(self):
        result = GlobalStats()

        for tasks in self.challenge.schedule:
            for task in tasks:
                t = task.name
                op_type = task.operation.type
                error_rate = self.error_rate(t, op_type)
                duration = self.duration(t)
                if task.operation.include_in_reporting or error_rate > 0:
                    self.logger.debug("Gathering request metrics for [%s].", t)
                    result.add_op_metrics(
                        t,
                        task.operation.name,
                        self.summary_stats("throughput", t, op_type),
                        self.single_latency(t, op_type),
                        self.single_latency(t, op_type, metric_name="service_time"),
                        self.single_latency(t, op_type, metric_name="processing_time"),
                        error_rate,
                        duration,
                        self.merge(self.track.meta_data, self.challenge.meta_data, task.operation.meta_data, task.meta_data),
                    )
        self.logger.debug("Gathering indexing metrics.")
        result.total_time = self.sum("indexing_total_time")
        result.total_time_per_shard = self.shard_stats("indexing_total_time")
        result.indexing_throttle_time = self.sum("indexing_throttle_time")
        result.indexing_throttle_time_per_shard = self.shard_stats("indexing_throttle_time")
        result.merge_time = self.sum("merges_total_time")
        result.merge_time_per_shard = self.shard_stats("merges_total_time")
        result.merge_count = self.sum("merges_total_count")
        result.refresh_time = self.sum("refresh_total_time")
        result.refresh_time_per_shard = self.shard_stats("refresh_total_time")
        result.refresh_count = self.sum("refresh_total_count")
        result.flush_time = self.sum("flush_total_time")
        result.flush_time_per_shard = self.shard_stats("flush_total_time")
        result.flush_count = self.sum("flush_total_count")
        result.merge_throttle_time = self.sum("merges_total_throttled_time")
        result.merge_throttle_time_per_shard = self.shard_stats("merges_total_throttled_time")

        self.logger.debug("Gathering ML max processing times.")
        result.ml_processing_time = self.ml_processing_time_stats()

        self.logger.debug("Gathering garbage collection metrics.")
        result.young_gc_time = self.sum("node_total_young_gen_gc_time")
        result.young_gc_count = self.sum("node_total_young_gen_gc_count")
        result.old_gc_time = self.sum("node_total_old_gen_gc_time")
        result.old_gc_count = self.sum("node_total_old_gen_gc_count")
        result.zgc_cycles_gc_time = self.sum("node_total_zgc_cycles_gc_time")
        result.zgc_cycles_gc_count = self.sum("node_total_zgc_cycles_gc_count")
        result.zgc_pauses_gc_time = self.sum("node_total_zgc_pauses_gc_time")
        result.zgc_pauses_gc_count = self.sum("node_total_zgc_pauses_gc_count")

        self.logger.debug("Gathering segment memory metrics.")
        result.memory_segments = self.median("segments_memory_in_bytes")
        result.memory_doc_values = self.median("segments_doc_values_memory_in_bytes")
        result.memory_terms = self.median("segments_terms_memory_in_bytes")
        result.memory_norms = self.median("segments_norms_memory_in_bytes")
        result.memory_points = self.median("segments_points_memory_in_bytes")
        result.memory_stored_fields = self.median("segments_stored_fields_memory_in_bytes")
        result.dataset_size = self.sum("dataset_size_in_bytes")
        result.store_size = self.sum("store_size_in_bytes")
        result.translog_size = self.sum("translog_size_in_bytes")

        # convert to int, fraction counts are senseless
        median_segment_count = self.median("segments_count")
        result.segment_count = int(median_segment_count) if median_segment_count is not None else median_segment_count

        self.logger.debug("Gathering transform processing times.")
        result.total_transform_processing_times = self.total_transform_metric("total_transform_processing_time")
        result.total_transform_index_times = self.total_transform_metric("total_transform_index_time")
        result.total_transform_search_times = self.total_transform_metric("total_transform_search_time")
        result.total_transform_throughput = self.total_transform_metric("total_transform_throughput")

        self.logger.debug("Gathering Ingest Pipeline metrics.")
        result.ingest_pipeline_cluster_count = self.sum("ingest_pipeline_cluster_count")
        result.ingest_pipeline_cluster_time = self.sum("ingest_pipeline_cluster_time")
        result.ingest_pipeline_cluster_failed = self.sum("ingest_pipeline_cluster_failed")

        self.logger.debug("Gathering disk usage metrics.")
        result.disk_usage_total = self.disk_usage("disk_usage_total")
        result.disk_usage_inverted_index = self.disk_usage("disk_usage_inverted_index")
        result.disk_usage_stored_fields = self.disk_usage("disk_usage_stored_fields")
        result.disk_usage_doc_values = self.disk_usage("disk_usage_doc_values")
        result.disk_usage_points = self.disk_usage("disk_usage_points")
        result.disk_usage_norms = self.disk_usage("disk_usage_norms")
        result.disk_usage_term_vectors = self.disk_usage("disk_usage_term_vectors")

        return result

    def merge(self, *args):
        # This is similar to dict(collections.ChainMap(args)) except that we skip `None` in our implementation.
        result = {}
        for arg in args:
            if arg is not None:
                result.update(arg)
        return result

    def sum(self, metric_name):
        values = self.store.get(metric_name)
        if values:
            return sum(values)
        else:
            return None

    def one(self, metric_name):
        return self.store.get_one(metric_name)

    def summary_stats(self, metric_name, task_name, operation_type):
        mean = self.store.get_mean(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
        median = self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
        unit = self.store.get_unit(metric_name, task=task_name, operation_type=operation_type)
        stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
        if mean and median and stats:
            return {
                "min": stats["min"],
                "mean": mean,
                "median": median,
                "max": stats["max"],
                "unit": unit,
            }
        else:
            return {
                "min": None,
                "mean": None,
                "median": None,
                "max": None,
                "unit": unit,
            }

    def shard_stats(self, metric_name):
        values = self.store.get_raw(metric_name, mapper=lambda doc: doc["per-shard"])
        unit = self.store.get_unit(metric_name)
        if values:
            flat_values = [w for v in values for w in v]
            return {
                "min": min(flat_values),
                "median": statistics.median(flat_values),
                "max": max(flat_values),
                "unit": unit,
            }
        else:
            return {}

    def ml_processing_time_stats(self):
        values = self.store.get_raw("ml_processing_time")
        result = []
        if values:
            for v in values:
                result.append(
                    {"job": v["job"], "min": v["min"], "mean": v["mean"], "median": v["median"], "max": v["max"], "unit": v["unit"]}
                )
        return result

    def total_transform_metric(self, metric_name):
        values = self.store.get_raw(metric_name)
        result = []
        if values:
            for v in values:
                transform_id = v.get("meta", {}).get("transform_id")
                if transform_id is not None:
                    result.append({"id": transform_id, "mean": v["value"], "unit": v["unit"]})
        return result

    def disk_usage(self, metric_name):
        values = self.store.get_raw(metric_name)
        result = []
        if values:
            for v in values:
                meta = v.get("meta", {})
                index = meta.get("index")
                field = meta.get("field")
                if index is not None and field is not None:
                    result.append({"index": index, "field": field, "value": v["value"], "unit": v["unit"]})
        return result

    def error_rate(self, task_name, operation_type):
        return self.store.get_error_rate(task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)

    def duration(self, task_name):
        return self.store.get_one(
            "service_time", task=task_name, mapper=lambda doc: doc["relative-time"], sort_key="relative-time", sort_reverse=True
        )

    def median(self, metric_name, task_name=None, operation_type=None, sample_type=None):
        return self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=sample_type)

    def single_latency(self, task, operation_type, metric_name="latency"):
        sample_type = SampleType.Normal
        stats = self.store.get_stats(metric_name, task=task, operation_type=operation_type, sample_type=sample_type)
        sample_size = stats["count"] if stats else 0
        if sample_size > 0:
            percentiles = self.store.get_percentiles(
                metric_name,
                task=task,
                operation_type=operation_type,
                sample_type=sample_type,
                percentiles=percentiles_for_sample_size(sample_size),
            )
            mean = self.store.get_mean(metric_name, task=task, operation_type=operation_type, sample_type=sample_type)
            unit = self.store.get_unit(metric_name, task=task, operation_type=operation_type)
            stats = collections.OrderedDict()
            for k, v in percentiles.items():
                # safely encode so we don't have any dots in field names
                stats[encode_float_key(k)] = v
            stats["mean"] = mean
            stats["unit"] = unit
            return stats
        else:
            return {}


class GlobalStats:
    def __init__(self, d=None):
        self.op_metrics = self.v(d, "op_metrics", default=[])
        self.total_time = self.v(d, "total_time")
        self.total_time_per_shard = self.v(d, "total_time_per_shard", default={})
        self.indexing_throttle_time = self.v(d, "indexing_throttle_time")
        self.indexing_throttle_time_per_shard = self.v(d, "indexing_throttle_time_per_shard", default={})
        self.merge_time = self.v(d, "merge_time")
        self.merge_time_per_shard = self.v(d, "merge_time_per_shard", default={})
        self.merge_count = self.v(d, "merge_count")
        self.refresh_time = self.v(d, "refresh_time")
        self.refresh_time_per_shard = self.v(d, "refresh_time_per_shard", default={})
        self.refresh_count = self.v(d, "refresh_count")
        self.flush_time = self.v(d, "flush_time")
        self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={})
        self.flush_count = self.v(d, "flush_count")
        self.merge_throttle_time = self.v(d, "merge_throttle_time")
        self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={})
        self.ml_processing_time = self.v(d, "ml_processing_time", default=[])

        self.young_gc_time = self.v(d, "young_gc_time")
        self.young_gc_count = self.v(d, "young_gc_count")
        self.old_gc_time = self.v(d, "old_gc_time")
        self.old_gc_count = self.v(d, "old_gc_count")
        self.zgc_cycles_gc_time = self.v(d, "zgc_cycles_gc_time")
        self.zgc_cycles_gc_count = self.v(d, "zgc_cycles_gc_count")
        self.zgc_pauses_gc_time = self.v(d, "zgc_pauses_gc_time")
        self.zgc_pauses_gc_count = self.v(d, "zgc_pauses_gc_count")

        self.memory_segments = self.v(d, "memory_segments")
        self.memory_doc_values = self.v(d, "memory_doc_values")
        self.memory_terms = self.v(d, "memory_terms")
        self.memory_norms = self.v(d, "memory_norms")
        self.memory_points = self.v(d, "memory_points")
        self.memory_stored_fields = self.v(d, "memory_stored_fields")
        self.dataset_size = self.v(d, "dataset_size")
        self.store_size = self.v(d, "store_size")
        self.translog_size = self.v(d, "translog_size")
        self.segment_count = self.v(d, "segment_count")

        self.total_transform_search_times = self.v(d, "total_transform_search_times")
        self.total_transform_index_times = self.v(d, "total_transform_index_times")
        self.total_transform_processing_times = self.v(d, "total_transform_processing_times")
        self.total_transform_throughput = self.v(d, "total_transform_throughput")

        self.ingest_pipeline_cluster_count = self.v(d, "ingest_pipeline_cluster_count")
        self.ingest_pipeline_cluster_time = self.v(d, "ingest_pipeline_cluster_time")
        self.ingest_pipeline_cluster_failed = self.v(d, "ingest_pipeline_cluster_failed")

        self.disk_usage_total = self.v(d, "disk_usage_total")
        self.disk_usage_inverted_index = self.v(d, "disk_usage_inverted_index")
        self.disk_usage_stored_fields = self.v(d, "disk_usage_stored_fields")
        self.disk_usage_doc_values = self.v(d, "disk_usage_doc_values")
        self.disk_usage_points = self.v(d, "disk_usage_points")
        self.disk_usage_norms = self.v(d, "disk_usage_norms")
        self.disk_usage_term_vectors = self.v(d, "disk_usage_term_vectors")

    def as_dict(self):
        return self.__dict__

    def as_flat_list(self):
        def op_metrics(op_item, key, single_value=False):
            doc = {"task": op_item["task"], "operation": op_item["operation"], "name": key}
            if single_value:
                doc["value"] = {"single": op_item[key]}
            else:
                doc["value"] = op_item[key]
            if "meta" in op_item:
                doc["meta"] = op_item["meta"]
            return doc

        all_results = []
        for metric, value in self.as_dict().items():
            if metric == "op_metrics":
                for item in value:
                    if "throughput" in item:
                        all_results.append(op_metrics(item, "throughput"))
                    if "latency" in item:
                        all_results.append(op_metrics(item, "latency"))
                    if "service_time" in item:
                        all_results.append(op_metrics(item, "service_time"))
                    if "processing_time" in item:
                        all_results.append(op_metrics(item, "processing_time"))
                    if "error_rate" in item:
                        all_results.append(op_metrics(item, "error_rate", single_value=True))
                    if "duration" in item:
                        all_results.append(op_metrics(item, "duration", single_value=True))
            elif metric == "ml_processing_time":
                for item in value:
                    all_results.append(
                        {
                            "job": item["job"],
                            "name": "ml_processing_time",
                            "value": {"min": item["min"], "mean": item["mean"], "median": item["median"], "max": item["max"]},
                        }
                    )
            elif metric.startswith("total_transform_") and value is not None:
                for item in value:
                    all_results.append({"id": item["id"], "name": metric, "value": {"single": item["mean"]}})
            elif metric.startswith("disk_usage_") and value is not None:
                for item in value:
                    all_results.append({"index": item["index"], "field": item["field"], "name": metric, "value": {"single": item["value"]}})
            elif metric.endswith("_time_per_shard"):
                if value:
                    all_results.append({"name": metric, "value": value})
            elif value is not None:
                result = {"name": metric, "value": {"single": value}}
                all_results.append(result)
        # sorting is just necessary to have a stable order for tests. As we just have a small number of metrics, the overhead is neglible.
        return sorted(all_results, key=lambda m: m["name"])

    def v(self, d, k, default=None):
        return d.get(k, default) if d else default

    def add_op_metrics(self, task, operation, throughput, latency, service_time, processing_time, error_rate, duration, meta):
        doc = {
            "task": task,
            "operation": operation,
            "throughput": throughput,
            "latency": latency,
            "service_time": service_time,
            "processing_time": processing_time,
            "error_rate": error_rate,
            "duration": duration,
        }
        if meta:
            doc["meta"] = meta
        self.op_metrics.append(doc)

    def tasks(self):
        # ensure we can read race.json files before Rally 0.8.0
        return [v.get("task", v["operation"]) for v in self.op_metrics]

    def metrics(self, task):
        # ensure we can read race.json files before Rally 0.8.0
        for r in self.op_metrics:
            if r.get("task", r["operation"]) == task:
                return r
        return None


class SystemStatsCalculator:
    def __init__(self, store, node_name):
        self.store = store
        self.logger = logging.getLogger(__name__)
        self.node_name = node_name

    def __call__(self):
        result = SystemStats()
        self.logger.debug("Calculating system metrics for [%s]", self.node_name)
        self.logger.debug("Gathering disk metrics.")
        self.add(result, "final_index_size_bytes", "index_size")
        self.add(result, "disk_io_write_bytes", "bytes_written")
        self.logger.debug("Gathering node startup time metrics.")
        self.add(result, "node_startup_time", "startup_time")
        return result

    def add(self, result, raw_metric_key, summary_metric_key):
        metric_value = self.store.get_one(raw_metric_key, node_name=self.node_name)
        metric_unit = self.store.get_unit(raw_metric_key, node_name=self.node_name)
        if metric_value:
            self.logger.debug("Adding record for [%s] with value [%s].", raw_metric_key, str(metric_value))
            result.add_node_metrics(self.node_name, summary_metric_key, metric_value, metric_unit)
        else:
            self.logger.debug("Skipping incomplete [%s] record.", raw_metric_key)


class SystemStats:
    def __init__(self, d=None):
        self.node_metrics = self.v(d, "node_metrics", default=[])

    def v(self, d, k, default=None):
        return d.get(k, default) if d else default

    def add_node_metrics(self, node, name, value, unit):
        metric = {"node": node, "name": name, "value": value}
        if unit:
            metric["unit"] = unit
        self.node_metrics.append(metric)

    def as_flat_list(self):
        all_results = []
        for v in self.node_metrics:
            all_results.append({"node": v["node"], "name": v["name"], "value": {"single": v["value"]}})
        # Sort for a stable order in tests.
        return sorted(all_results, key=lambda m: m["name"])
