esrally/telemetry.py (1,793 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import collections
import fnmatch
import logging
import os
import threading
import tabulate
from esrally import exceptions, metrics, time
from esrally.metrics import MetaInfoScope
from esrally.utils import console, io, opts, process, serverless, sysstats, versions
from esrally.utils.versions import Version
def list_telemetry():
console.println("Available telemetry devices:\n")
devices = [
[device.command, device.human_name, device.help]
for device in [
JitCompiler,
Gc,
FlightRecorder,
Heapdump,
NodeStats,
RecoveryStats,
CcrStats,
SegmentStats,
TransformStats,
SearchableSnapshotsStats,
ShardStats,
DataStreamStats,
IngestPipelineStats,
DiskUsageStats,
GeoIpStats,
]
]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.")
class Telemetry:
def __init__(self, enabled_devices=None, devices=None, serverless_mode=False, serverless_operator=False):
if devices is None:
devices = []
if enabled_devices is None:
enabled_devices = []
self.enabled_devices = enabled_devices
self.devices = devices
self.serverless_mode = serverless_mode
self.serverless_operator = serverless_operator
def instrument_candidate_java_opts(self):
opts = []
for device in self.devices:
if self._enabled(device):
additional_opts = device.instrument_java_opts()
# properly merge values with the same key
opts.extend(additional_opts)
return opts
def on_pre_node_start(self, node_name):
for device in self.devices:
if self._enabled(device):
device.on_pre_node_start(node_name)
def attach_to_node(self, node):
for device in self.devices:
if self._enabled(device):
device.attach_to_node(node)
def detach_from_node(self, node, running):
for device in self.devices:
if self._enabled(device):
device.detach_from_node(node, running)
def on_benchmark_start(self):
for device in self.devices:
if self._enabled(device):
if self.serverless_mode and not self._available_on_serverless(device):
# Only inform about exclusion if the user explicitly asked for this device
if getattr(device, "command", None) in self.enabled_devices:
console.info(f"Excluding telemetry device [{device.command}] as it is unavailable on serverless.")
continue
device.on_benchmark_start()
def on_benchmark_stop(self):
for device in self.devices:
if self._enabled(device):
if self.serverless_mode and not self._available_on_serverless(device):
# Not informing the user the second time, see on_benchmark_start()
continue
device.on_benchmark_stop()
def store_system_metrics(self, node, metrics_store):
for device in self.devices:
if self._enabled(device):
device.store_system_metrics(node, metrics_store)
def _enabled(self, device):
return device.internal or device.command in self.enabled_devices
def _available_on_serverless(self, device):
if self.serverless_operator:
return device.serverless_status >= serverless.Status.Internal
else:
return device.serverless_status == serverless.Status.Public
########################################################################################
#
# Telemetry devices
#
########################################################################################
class TelemetryDevice:
def __init__(self):
self.logger = logging.getLogger(__name__)
def instrument_java_opts(self):
return {}
def on_pre_node_start(self, node_name):
pass
def attach_to_node(self, node):
pass
def detach_from_node(self, node, running):
pass
def on_benchmark_start(self):
pass
def on_benchmark_stop(self):
pass
def store_system_metrics(self, node, metrics_store):
pass
def __getstate__(self):
state = self.__dict__.copy()
del state["logger"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.logger = logging.getLogger(__name__)
class InternalTelemetryDevice(TelemetryDevice):
internal = True
class Sampler:
"""This class contains the actual logic of SamplerThread for unit test purposes."""
def __init__(self, recorder, *, sleep=time.sleep):
self.stop = False
self.recorder = recorder
self.sleep = sleep
def finish(self):
self.stop = True
def run(self):
# noinspection PyBroadException
try:
sleep_left = self.recorder.sample_interval
while True:
if sleep_left <= 0:
self.recorder.record()
sleep_left = self.recorder.sample_interval
if self.stop:
break
# check for self.stop at least every second
sleep_seconds = min(sleep_left, 1)
self.sleep(sleep_seconds)
sleep_left -= sleep_seconds
except BaseException:
logging.getLogger(__name__).exception("Could not determine %s", self.recorder)
class SamplerThread(Sampler, threading.Thread):
def __init__(self, recorder):
threading.Thread.__init__(self)
Sampler.__init__(self, recorder)
def finish(self):
super().finish()
self.join()
class FlightRecorder(TelemetryDevice):
internal = False
command = "jfr"
human_name = "Flight Recorder"
help = "Enables Java Flight Recorder (requires an Oracle JDK or OpenJDK 11+)"
def __init__(self, telemetry_params, log_root, java_major_version):
super().__init__()
self.telemetry_params = telemetry_params
self.log_root = log_root
self.java_major_version = java_major_version
def instrument_java_opts(self):
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "profile.jfr")
# JFR was integrated into OpenJDK 11 and is not a commercial feature anymore.
if self.java_major_version < 11:
console.println("\n***************************************************************************\n")
console.println("[WARNING] Java flight recorder is a commercial feature of the Oracle JDK.\n")
console.println("You are using Java flight recorder which requires that you comply with\nthe licensing terms stated in:\n")
console.println(console.format.link("http://www.oracle.com/technetwork/java/javase/terms/license/index.html"))
console.println("\nBy using this feature you confirm that you comply with these license terms.\n")
console.println('Otherwise, please abort and rerun Rally without the "jfr" telemetry device.')
console.println("\n***************************************************************************\n")
time.sleep(3)
console.info("%s: Writing flight recording to [%s]" % (self.human_name, log_file), logger=self.logger)
java_opts = self.java_opts(log_file)
self.logger.info("jfr: Adding JVM arguments: [%s].", java_opts)
return java_opts
def java_opts(self, log_file):
recording_template = self.telemetry_params.get("recording-template")
delay = self.telemetry_params.get("jfr-delay")
duration = self.telemetry_params.get("jfr-duration")
java_opts = ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"]
jfr_cmd = ""
if self.java_major_version < 11:
java_opts.append("-XX:+UnlockCommercialFeatures")
if self.java_major_version < 9:
java_opts.append("-XX:+FlightRecorder")
java_opts.append(f"-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={log_file}")
jfr_cmd = "-XX:StartFlightRecording=defaultrecording=true"
else:
jfr_cmd += f"-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={log_file}"
if delay:
self.logger.info("jfr: Using delay [%s].", delay)
jfr_cmd += f",delay={delay}"
if duration:
self.logger.info("jfr: Using duration [%s].", duration)
jfr_cmd += f",duration={duration}"
if recording_template:
self.logger.info("jfr: Using recording template [%s].", recording_template)
jfr_cmd += f",settings={recording_template}"
else:
self.logger.info("jfr: Using default recording template.")
java_opts.append(jfr_cmd)
return java_opts
class JitCompiler(TelemetryDevice):
internal = False
command = "jit"
human_name = "JIT Compiler Profiler"
help = "Enables JIT compiler logs."
def __init__(self, log_root, java_major_version):
super().__init__()
self.log_root = log_root
self.java_major_version = java_major_version
def instrument_java_opts(self):
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "jit.log")
console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger)
if self.java_major_version < 9:
return [
"-XX:+UnlockDiagnosticVMOptions",
"-XX:+TraceClassLoading",
"-XX:+LogCompilation",
f"-XX:LogFile={log_file}",
"-XX:+PrintAssembly",
]
else:
return [
"-XX:+UnlockDiagnosticVMOptions",
"-Xlog:class+load=info",
"-XX:+LogCompilation",
f"-XX:LogFile={log_file}",
"-XX:+PrintAssembly",
]
class Gc(TelemetryDevice):
internal = False
command = "gc"
human_name = "GC log"
help = "Enables GC logs."
def __init__(self, telemetry_params, log_root, java_major_version):
super().__init__()
self.telemetry_params = telemetry_params
self.log_root = log_root
self.java_major_version = java_major_version
def instrument_java_opts(self):
io.ensure_dir(self.log_root)
log_file = os.path.join(self.log_root, "gc.log")
console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger)
return self.java_opts(log_file)
def java_opts(self, log_file):
if self.java_major_version < 9:
return [
f"-Xloggc:{log_file}",
"-XX:+PrintGCDetails",
"-XX:+PrintGCDateStamps",
"-XX:+PrintGCTimeStamps",
"-XX:+PrintGCApplicationStoppedTime",
"-XX:+PrintGCApplicationConcurrentTime",
"-XX:+PrintTenuringDistribution",
]
else:
log_config = self.telemetry_params.get("gc-log-config", "gc*=info,safepoint=info,age*=trace")
# see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5
return [f"-Xlog:{log_config}:file={log_file}:utctime,uptimemillis,level,tags:filecount=0"]
class Heapdump(TelemetryDevice):
internal = False
command = "heapdump"
human_name = "Heap Dump"
help = "Captures a heap dump."
def __init__(self, log_root):
super().__init__()
self.log_root = log_root
def detach_from_node(self, node, running):
if running:
io.ensure_dir(self.log_root)
heap_dump_file = os.path.join(self.log_root, f"heap_at_exit_{node.pid}.hprof")
console.info(f"{self.human_name}: Writing heap dump to [{heap_dump_file}]", logger=self.logger)
cmd = f"jmap -dump:format=b,file={heap_dump_file} {node.pid}"
if process.run_subprocess_with_logging(cmd):
self.logger.warning("Could not write heap dump to [%s]", heap_dump_file)
class SegmentStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "segment-stats"
human_name = "Segment Stats"
help = "Determines segment stats at the end of the benchmark."
def __init__(self, log_root, client):
super().__init__()
self.log_root = log_root
self.client = client
def on_benchmark_stop(self):
# noinspection PyBroadException
try:
segment_stats = self.client.cat.segments(index="_all", v=True)
stats_file = os.path.join(self.log_root, "segment_stats.log")
console.info(f"{self.human_name}: Writing segment stats to [{stats_file}]", logger=self.logger)
with open(stats_file, "w") as f:
f.write(segment_stats)
except BaseException:
self.logger.exception("Could not retrieve segment stats.")
class CcrStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Blocked
command = "ccr-stats"
human_name = "CCR Stats"
help = "Regularly samples Cross Cluster Replication (CCR) related stats"
"""
Gathers CCR stats on a cluster level
"""
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``ccr-stats-indices``: JSON string specifying the indices per cluster to publish statistics from.
Not all clusters need to be specified, but any name used must be be present in target.hosts.
Example:
{"ccr-stats-indices": {"cluster_a": ["follower"],"default": ["leader"]}
``ccr-stats-sample-interval``: positive integer controlling the sampling interval. Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("ccr-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'ccr-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.specified_cluster_names = self.clients.keys()
self.indices_per_cluster = self.telemetry_params.get("ccr-stats-indices", False)
if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
"The telemetry parameter 'ccr-stats-indices' must be a JSON Object with keys matching "
"the cluster names [{}] specified in --target-hosts "
"but it had [{}].".format(",".join(sorted(clients.keys())), cluster_name)
)
self.specified_cluster_names = self.indices_per_cluster.keys()
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
recorder = []
for cluster_name in self.specified_cluster_names:
recorder = CcrStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None,
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class CcrStatsRecorder:
"""
Collects and pushes CCR stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param indices: optional list of indices to filter results from.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)
def __str__(self):
return "ccr stats"
def record(self):
"""
Collect CCR stats for indexes (optionally) specified in telemetry parameters and push to metrics store.
"""
# ES returns all stats values in bytes or ms via "human: false"
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
ccr_stats_api_endpoint = "/_ccr/stats"
filter_path = "follow_stats"
stats = self.client.perform_request(
method="GET", path=ccr_stats_api_endpoint, params={"human": "false", "filter_path": filter_path}
)
except elasticsearch.TransportError:
msg = "A transport error occurred while collecting CCR stats from the endpoint [{}?filter_path={}] on cluster [{}]".format(
ccr_stats_api_endpoint, filter_path, self.cluster_name
)
self.logger.exception(msg)
raise exceptions.RallyError(msg)
if filter_path in stats and "indices" in stats[filter_path]:
for indices in stats[filter_path]["indices"]:
try:
if self.indices and indices["index"] not in self.indices:
# Skip metrics for indices not part of user supplied whitelist (ccr-stats-indices) in telemetry params.
continue
self.record_stats_per_index(indices["index"], indices["shards"])
except KeyError:
self.logger.warning(
"The 'indices' key in %s does not contain an 'index' or 'shards' key "
"Maybe the output format of the %s endpoint has changed. Skipping.",
ccr_stats_api_endpoint,
ccr_stats_api_endpoint,
)
def record_stats_per_index(self, name, stats):
"""
:param name: The index name.
:param stats: A dict with returned CCR stats for the index.
"""
for shard_stats in stats:
if "shard_id" in shard_stats:
doc = {"name": "ccr-stats", "shard": shard_stats}
shard_metadata = {"cluster": self.cluster_name, "index": name}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)
class RecoveryStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "recovery-stats"
human_name = "Recovery Stats"
help = "Regularly samples shard recovery stats"
"""
Gathers recovery stats on a cluster level
"""
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``recovery-stats-indices``: JSON structure specifying the index pattern per cluster to publish stats from.
Not all clusters need to be specified, but any name used must be be present in target.hosts. Alternatively,
the index pattern can be specified as a string can be specified in case only one cluster is involved.
Example:
{"recovery-stats-indices": {"cluster_a": ["follower"],"default": ["leader"]}
``recovery-stats-sample-interval``: positive integer controlling the sampling interval. Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("recovery-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
"The telemetry parameter 'recovery-stats-sample-interval' must be greater than zero but was {}.".format(
self.sample_interval
)
)
self.specified_cluster_names = self.clients.keys()
indices_per_cluster = self.telemetry_params.get("recovery-stats-indices", False)
# allow the user to specify either an index pattern as string or as a JSON object
if isinstance(indices_per_cluster, str):
self.indices_per_cluster = {opts.TargetHosts.DEFAULT: indices_per_cluster}
else:
self.indices_per_cluster = indices_per_cluster
if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
"The telemetry parameter 'recovery-stats-indices' must be a JSON Object with keys matching "
"the cluster names [{}] specified in --target-hosts "
"but it had [{}].".format(",".join(sorted(clients.keys())), cluster_name)
)
self.specified_cluster_names = self.indices_per_cluster.keys()
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = RecoveryStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else "",
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class RecoveryStatsRecorder:
"""
Collects and pushes recovery stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param indices: optional list of indices to filter results from.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)
def __str__(self):
return "recovery stats"
def record(self):
"""
Collect recovery stats for indexes (optionally) specified in telemetry parameters and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.indices.recovery(index=self.indices, active_only=True, detailed=False)
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting recovery stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
for idx, idx_stats in stats.items():
for shard in idx_stats["shards"]:
doc = {"name": "recovery-stats", "shard": shard}
shard_metadata = {"cluster": self.cluster_name, "index": idx, "shard": shard["id"]}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)
class ShardStats(TelemetryDevice):
"""
Collects and pushes shard stats for the specified cluster to the metric store.
"""
internal = False
serverless_status = serverless.Status.Internal
command = "shard-stats"
human_name = "Shard Stats"
help = "Regularly samples nodes stats at shard level"
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``shard-stats-sample-interval``: positive integer controlling the sampling interval. Default: 60 seconds.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.specified_cluster_names = self.clients.keys()
self.sample_interval = telemetry_params.get("shard-stats-sample-interval", 60)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'shard-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = ShardStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class ShardStatsRecorder:
"""
Collects and pushes shard stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)
def __str__(self):
return "shard stats"
def record(self):
"""
Collect node-stats?level=shards and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
sample = self.client.nodes.stats(metric="_all", level="shards")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting shard stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
shard_metadata = {"cluster": self.cluster_name}
for node_stats in sample["nodes"].values():
node_name = node_stats["name"]
collected_node_stats = collections.OrderedDict()
collected_node_stats["name"] = "shard-stats"
shard_stats = node_stats["indices"].get("shards")
for index_name, stats in shard_stats.items():
for curr_shard in stats:
for shard_id, curr_stats in curr_shard.items():
doc = {
"name": "shard-stats",
"shard-id": shard_id,
"index": index_name,
"primary": curr_stats.get("routing", {}).get("primary"),
"docs": curr_stats.get("docs", {}).get("count", -1),
"store": curr_stats.get("store", {}).get("size_in_bytes", -1),
"segments-count": curr_stats.get("segments", {}).get("count", -1),
"node": node_name,
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)
class NodeStats(TelemetryDevice):
"""
Gathers different node stats.
"""
internal = False
serverless_status = serverless.Status.Internal
command = "node-stats"
human_name = "Node Stats"
help = "Regularly samples node stats"
warning = """You have enabled the node-stats telemetry device with Elasticsearch < 7.2.0. Requests to the
_nodes/stats Elasticsearch endpoint trigger additional refreshes and WILL SKEW results.
"""
def __init__(self, telemetry_params, clients, metrics_store):
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.specified_cluster_names = self.clients.keys()
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
default_client = self.clients["default"]
es_info = default_client.info()
es_version = es_info["version"].get("number", "7.2.0")
if Version.from_string(es_version) < Version(major=7, minor=2, patch=0):
console.warn(NodeStats.warning, logger=self.logger)
for cluster_name in self.specified_cluster_names:
recorder = NodeStatsRecorder(self.telemetry_params, cluster_name, self.clients[cluster_name], self.metrics_store)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class NodeStatsRecorder:
def __init__(self, telemetry_params, cluster_name, client, metrics_store):
self.logger = logging.getLogger(__name__)
self.logger.info("node stats recorder")
self.sample_interval = telemetry_params.get("node-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'node-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.include_indices = telemetry_params.get("node-stats-include-indices", False)
self.include_indices_metrics = telemetry_params.get("node-stats-include-indices-metrics", False)
if self.include_indices_metrics:
if isinstance(self.include_indices_metrics, str):
self.include_indices_metrics_list = opts.csv_to_list(self.include_indices_metrics)
elif isinstance(self.include_indices_metrics, list):
self.include_indices_metrics_list = self.include_indices_metrics
else:
# we don't validate the allowable metrics as they may change across ES versions
raise exceptions.SystemSetupError(
"The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string"
" or a list but was {}".format(type(self.include_indices_metrics))
)
self.logger.debug("Including indices metrics: %s", self.include_indices_metrics_list)
else:
self.include_indices_metrics_list = [
"docs",
"store",
"indexing",
"search",
"merges",
"refresh",
"flush",
"query_cache",
"fielddata",
"segments",
"translog",
"request_cache",
]
self.include_thread_pools = telemetry_params.get("node-stats-include-thread-pools", True)
self.include_buffer_pools = telemetry_params.get("node-stats-include-buffer-pools", True)
self.include_breakers = telemetry_params.get("node-stats-include-breakers", True)
self.include_network = telemetry_params.get("node-stats-include-network", True)
self.include_process = telemetry_params.get("node-stats-include-process", True)
self.include_mem_stats = telemetry_params.get("node-stats-include-mem", True)
self.include_cgroup_stats = telemetry_params.get("node-stats-include-cgroup", True)
self.include_gc_stats = telemetry_params.get("node-stats-include-gc", True)
self.include_indexing_pressure = telemetry_params.get("node-stats-include-indexing-pressure", True)
self.include_fs_stats = telemetry_params.get("node-stats-include-fs", True)
self.client = client
self.metrics_store = metrics_store
self.cluster_name = cluster_name
def __str__(self):
return "node stats"
def record(self):
current_sample = self.sample()
for node_stats in current_sample:
node_name = node_stats["name"]
roles = node_stats["roles"]
metrics_store_meta_data = {"cluster": self.cluster_name, "node_name": node_name, "roles": roles}
collected_node_stats = collections.OrderedDict()
collected_node_stats["name"] = "node-stats"
if self.include_indices or self.include_indices_metrics:
collected_node_stats.update(self.indices_stats(node_name, node_stats, include=self.include_indices_metrics_list))
if self.include_thread_pools:
collected_node_stats.update(self.thread_pool_stats(node_name, node_stats))
if self.include_breakers:
collected_node_stats.update(self.circuit_breaker_stats(node_name, node_stats))
if self.include_buffer_pools:
collected_node_stats.update(self.jvm_buffer_pool_stats(node_name, node_stats))
if self.include_mem_stats:
collected_node_stats.update(self.jvm_mem_stats(node_name, node_stats))
collected_node_stats.update(self.os_mem_stats(node_name, node_stats))
if self.include_cgroup_stats:
collected_node_stats.update(self.os_cgroup_stats(node_name, node_stats))
if self.include_gc_stats:
collected_node_stats.update(self.jvm_gc_stats(node_name, node_stats))
if self.include_network:
collected_node_stats.update(self.network_stats(node_name, node_stats))
if self.include_process:
collected_node_stats.update(self.process_stats(node_name, node_stats))
if self.include_indexing_pressure:
collected_node_stats.update(self.indexing_pressure(node_name, node_stats))
if self.include_fs_stats:
collected_node_stats.update(self.fs_stats(node_name, node_stats))
self.metrics_store.put_doc(
dict(collected_node_stats), level=MetaInfoScope.node, node_name=node_name, meta_data=metrics_store_meta_data
)
def indices_stats(self, node_name, node_stats, include):
idx_stats = node_stats["indices"]
ordered_results = collections.OrderedDict()
for section in include:
if section in idx_stats:
ordered_results.update(flatten_stats_fields(prefix="indices_" + section, stats=idx_stats[section]))
return ordered_results
def thread_pool_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="thread_pool", stats=node_stats["thread_pool"])
def circuit_breaker_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="breakers", stats=node_stats["breakers"])
def jvm_buffer_pool_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="jvm_buffer_pools", stats=node_stats["jvm"]["buffer_pools"])
def jvm_mem_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="jvm_mem", stats=node_stats["jvm"]["mem"])
def os_mem_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="os_mem", stats=node_stats["os"]["mem"])
def os_cgroup_stats(self, node_name, node_stats):
cgroup_stats = {}
try:
cgroup_stats = flatten_stats_fields(prefix="os_cgroup", stats=node_stats["os"]["cgroup"])
except KeyError:
self.logger.debug("Node cgroup stats requested with none present.")
return cgroup_stats
def jvm_gc_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="jvm_gc", stats=node_stats["jvm"]["gc"])
def network_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="transport", stats=node_stats.get("transport"))
def process_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="process_cpu", stats=node_stats["process"]["cpu"])
def indexing_pressure(self, node_name, node_stats):
return flatten_stats_fields(prefix="indexing_pressure", stats=node_stats["indexing_pressure"])
def fs_stats(self, node_name, node_stats):
return flatten_stats_fields(prefix="fs", stats=node_stats.get("fs"))
def sample(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.nodes.stats(metric="_all")
except elasticsearch.TransportError:
logging.getLogger(__name__).exception("Could not retrieve node stats.")
return {}
return stats["nodes"].values()
class TransformStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Public
command = "transform-stats"
human_name = "Transform Stats"
help = "Regularly samples transform stats"
"""
Gathers Transform stats
"""
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("transform-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'transform-stats-sample-interval' must be greater than zero but was [{self.sample_interval}]."
)
self.specified_cluster_names = self.clients.keys()
self.transforms_per_cluster = self.telemetry_params.get("transform-stats-transforms", False)
if self.transforms_per_cluster:
for cluster_name in self.transforms_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'transform-stats-transforms' must be a JSON Object with keys "
f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts "
f"but it had [{cluster_name}]."
)
self.specified_cluster_names = self.transforms_per_cluster.keys()
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = TransformStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
self.transforms_per_cluster[cluster_name] if self.transforms_per_cluster else None,
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
# record the final stats
sampler.recorder.record_final()
class TransformStatsRecorder:
"""
Collects and pushes Transform stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval, transforms=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param transforms: optional list of transforms to filter results from.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.transforms = transforms
self.logger = logging.getLogger(__name__)
self.logger.info("transform stats recorder")
def __str__(self):
return "transform stats"
def record(self):
"""
Collect Transform stats for transforms (optionally) specified in telemetry parameters and push to metrics store.
"""
self._record()
def record_final(self):
"""
Collect final Transform stats for transforms (optionally) specified in telemetry parameters and push to metrics store.
"""
self._record("total_")
def _record(self, prefix=""):
# ES returns all stats values in bytes or ms via "human: false"
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.transform.get_transform_stats(transform_id="_all")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting transform stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
for transform in stats["transforms"]:
try:
if self.transforms and transform["id"] not in self.transforms:
# Skip metrics for transform not part of user supplied whitelist (transform-stats-transforms)
# in telemetry params.
continue
self.record_stats_per_transform(transform["id"], transform["stats"], prefix)
except KeyError:
self.logger.warning(
"The 'transform' key does not contain a 'transform' or 'stats' key Maybe the output format has changed. Skipping."
)
def record_stats_per_transform(self, transform_id, stats, prefix=""):
"""
:param transform_id: The transform id.
:param stats: A dict with returned transform stats for the transform.
:param prefix: A prefix for the counters/values, e.g. for total runtimes
"""
meta_data = {"transform_id": transform_id}
self.metrics_store.put_value_cluster_level(
prefix + "transform_pages_processed", stats.get("pages_processed", 0), meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_documents_processed", stats.get("documents_processed", 0), meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_documents_indexed", stats.get("documents_indexed", 0), meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(prefix + "transform_index_total", stats.get("index_total", 0), meta_data=meta_data)
self.metrics_store.put_value_cluster_level(prefix + "transform_index_failures", stats.get("index_failures", 0), meta_data=meta_data)
self.metrics_store.put_value_cluster_level(prefix + "transform_search_total", stats.get("search_total", 0), meta_data=meta_data)
self.metrics_store.put_value_cluster_level(
prefix + "transform_search_failures", stats.get("search_failures", 0), meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_processing_total", stats.get("processing_total", 0), meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_search_time", stats.get("search_time_in_ms", 0), "ms", meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_index_time", stats.get("index_time_in_ms", 0), "ms", meta_data=meta_data
)
self.metrics_store.put_value_cluster_level(
prefix + "transform_processing_time", stats.get("processing_time_in_ms", 0), "ms", meta_data=meta_data
)
documents_processed = stats.get("documents_processed", 0)
processing_time = stats.get("search_time_in_ms", 0)
processing_time += stats.get("processing_time_in_ms", 0)
processing_time += stats.get("index_time_in_ms", 0)
if processing_time > 0:
throughput = documents_processed / processing_time * 1000
self.metrics_store.put_value_cluster_level(prefix + "transform_throughput", throughput, "docs/s", meta_data=meta_data)
class SearchableSnapshotsStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Blocked
command = "searchable-snapshots-stats"
human_name = "Searchable Snapshots Stats"
help = "Regularly samples searchable snapshots stats"
"""
Gathers searchable snapshots stats on a cluster level
"""
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``searchable-stats-indices``: str with index/index-pattern or list of indices or index-patterns
that stats should be collected from.
Specifying this will implicitly use the level=indices parameter in the API call, as opposed to
the level=cluster (default).
Not all clusters need to be specified, but any name used must be be present in target.hosts.
Alternatively, the index or index pattern can be specified as a string in case only one cluster is involved.
Examples:
--telemetry-params="searchable-snapshots-stats-indices:elasticlogs-2020-01-01"
--telemetry-params="searchable-snapshots-stats-indices:elasticlogs*"
--telemetry-params=./telemetry-params.json
where telemetry-params.json is:
{
"searchable-snapshots-stats-indices": {
"default": ["leader-elasticlogs-*"],
"follower": ["follower-elasticlogs-*"]
}
}
``searchable-snapshots-stats-sample-interval``: positive integer controlling the sampling interval.
Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("searchable-snapshots-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'searchable-snapshots-stats-sample-interval' must be greater than zero "
f"but was {self.sample_interval}."
)
self.specified_cluster_names = self.clients.keys()
indices_per_cluster = self.telemetry_params.get("searchable-snapshots-stats-indices", None)
# allow the user to specify either an index pattern as string or as a JSON object
if isinstance(indices_per_cluster, str):
self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]}
else:
self.indices_per_cluster = indices_per_cluster
if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'searchable-snapshots-stats-indices' must be a JSON Object "
f"with keys matching the cluster names [{','.join(sorted(clients.keys()))}] specified in "
f"--target-hosts but it had [{cluster_name}]."
)
self.specified_cluster_names = self.indices_per_cluster.keys()
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = SearchableSnapshotsStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None,
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class SearchableSnapshotsStatsRecorder:
"""
Collects and pushes searchable snapshots stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param indices: optional list of indices to filter results from.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)
def __str__(self):
return "searchable snapshots stats"
def record(self):
"""
Collect searchable snapshots stats for indexes (optionally) specified in telemetry parameters
and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats_api_endpoint = "/_searchable_snapshots/stats"
level = "indices" if self.indices else "cluster"
# we don't use the existing client support (searchable_snapshots.stats())
# as the API is deliberately undocumented and might change:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/searchable-snapshots-api-stats.html
stats = self.client.perform_request(method="GET", path=stats_api_endpoint, params={"level": level})
except elasticsearch.NotFoundError as e:
if "No searchable snapshots indices found" in e.info.get("error").get("reason"):
self.logger.info(
"Unable to find valid indices while collecting searchable snapshots stats on cluster [%s]", self.cluster_name
)
# allow collection, indices might be mounted later on
return
except elasticsearch.TransportError:
raise exceptions.RallyError(
f"A transport error occurred while collecting searchable snapshots stats on cluster [{self.cluster_name}]"
) from None
total_stats = stats.get("total", [])
for lucene_file_stats in total_stats:
self._push_stats(level="cluster", stats=lucene_file_stats)
if self.indices:
for idx, idx_stats in stats.get("indices", {}).items():
if not self._match_list_or_pattern(idx):
continue
for lucene_file_stats in idx_stats.get("total", []):
self._push_stats(level="index", stats=lucene_file_stats, index=idx)
def _push_stats(self, level, stats, index=None):
doc = {
"name": "searchable-snapshots-stats",
# be lenient as the API is still WiP
"lucene_file_type": stats.get("file_ext"),
"stats": stats,
}
if index:
doc["index"] = index
meta_data = {"cluster": self.cluster_name, "level": level}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data)
# TODO Consider moving under the utils package for broader/future use?
# at the moment it's only useful here as this stats API is undocumented and we don't wan't to use
# a specific elasticsearch-py stats method that could support index filtering in a standard way
def _match_list_or_pattern(self, idx):
"""
Match idx from self.indices
:param idx: String that may include shell style wildcards (https://docs.python.org/3/library/fnmatch.html)
:return: Boolean if idx matches anything from self.indices
"""
for index_param in self.indices:
if fnmatch.fnmatch(idx, index_param):
return True
return False
class DataStreamStats(TelemetryDevice):
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""
internal = False
serverless_status = serverless.Status.Public
command = "data-stream-stats"
human_name = "Data Stream Stats"
help = "Regularly samples data stream stats"
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``data-stream-stats-sample-interval``: An integer controlling the interval, in seconds,
between collecting samples. Default: 10s.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.specified_cluster_names = self.clients.keys()
self.sample_interval = telemetry_params.get("data-stream-stats-sample-interval", 10)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'data-stream-stats-sample-interval' must be greater than zero " f"but was {self.sample_interval}."
)
self.metrics_store = metrics_store
self.samplers = []
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = DataStreamStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval)
es_info = self.clients[cluster_name].info()
distribution_version = es_info["version"].get("number", "7.9.0")
distribution_flavor = es_info["version"].get("build_flavor", "oss")
if Version.from_string(distribution_version) < Version(major=7, minor=9, patch=0):
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards"
)
if distribution_flavor == "oss":
raise exceptions.SystemSetupError(
"The data-stream-stats telemetry device cannot be used with an OSS distribution of Elasticsearch"
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class DataStreamStatsRecorder:
"""
Collects and pushes data stream stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: An integer controlling the interval, in seconds, between collecting samples.
"""
self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)
def __str__(self):
return "data stream stats"
def record(self):
"""
Collect _data_stream/stats and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
sample = self.client.indices.data_streams_stats(name="")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting data stream stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
data_stream_metadata = {"cluster": self.cluster_name}
doc = {
"data_stream": "_all",
"name": "data-stream-stats",
"shards": {
"total": sample["_shards"]["total"],
"successful_shards": sample["_shards"]["successful"],
"failed_shards": sample["_shards"]["failed"],
},
"data_stream_count": sample["data_stream_count"],
"backing_indices": sample["backing_indices"],
"total_store_size_bytes": sample["total_store_size_bytes"],
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)
for ds in sample["data_streams"]:
doc = {
"name": "data-stream-stats",
"data_stream": ds["data_stream"],
"backing_indices": ds["backing_indices"],
"store_size_bytes": ds["store_size_bytes"],
"maximum_timestamp": ds["maximum_timestamp"],
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)
class IngestPipelineStats(InternalTelemetryDevice):
command = "ingest-pipeline-stats"
serverless_status = serverless.Status.Internal
human_name = "Ingest Pipeline Stats"
help = "Reports Ingest Pipeline stats at the end of the benchmark."
def __init__(self, clients, metrics_store):
super().__init__()
self.logger = logging.getLogger(__name__)
self.clients = clients
self.metrics_store = metrics_store
self.start_stats = {}
self.specified_cluster_names = self.clients.keys()
self.ingest_pipeline_cluster_count = 0
self.ingest_pipeline_cluster_time = 0
self.ingest_pipeline_cluster_failed = 0
def on_benchmark_start(self):
self.logger.info("Gathering Ingest Pipeline stats at benchmark start")
self.start_stats = self.get_ingest_pipeline_stats()
def on_benchmark_stop(self):
self.logger.info("Gathering Ingest Pipeline stats at benchmark end")
end_stats = self.get_ingest_pipeline_stats()
for cluster_name, node in end_stats.items():
if cluster_name not in self.start_stats:
self.logger.warning(
"Cannot determine Ingest Pipeline stats for %s (cluster stats weren't collected at the start of the benchmark).",
cluster_name,
)
continue
for node_name, summaries in node.items():
if node_name not in self.start_stats[cluster_name]:
self.logger.warning(
"Cannot determine Ingest Pipeline stats for %s (not in the cluster at the start of the benchmark).", node_name
)
continue
for summary_name, stats in summaries.items():
if summary_name == "total":
# The top level "total" contains stats for the node as a whole,
# each node will have exactly one top level "total" key
self._record_node_level_pipeline_stats(stats, cluster_name, node_name)
elif summary_name == "pipelines":
for pipeline_name, pipeline in stats.items():
self._record_pipeline_level_processor_stats(pipeline, pipeline_name, cluster_name, node_name)
self._record_cluster_level_pipeline_stats(cluster_name)
def _record_cluster_level_pipeline_stats(self, cluster_name):
metadata = {"cluster_name": cluster_name}
self.metrics_store.put_value_cluster_level("ingest_pipeline_cluster_count", self.ingest_pipeline_cluster_count, meta_data=metadata)
self.metrics_store.put_value_cluster_level(
"ingest_pipeline_cluster_time", self.ingest_pipeline_cluster_time, "ms", meta_data=metadata
)
self.metrics_store.put_value_cluster_level(
"ingest_pipeline_cluster_failed", self.ingest_pipeline_cluster_failed, meta_data=metadata
)
def _record_node_level_pipeline_stats(self, stats, cluster_name, node_name):
# Node level statistics are calculated per-benchmark execution. Stats are collected at the beginning, and end of
# each benchmark
metadata = {"cluster_name": cluster_name}
ingest_pipeline_node_count = stats.get("count", 0) - self.start_stats[cluster_name][node_name]["total"].get("count", 0)
ingest_pipeline_node_time = stats.get("time_in_millis", 0) - self.start_stats[cluster_name][node_name]["total"].get(
"time_in_millis", 0
)
ingest_pipeline_node_failed = stats.get("failed", 0) - self.start_stats[cluster_name][node_name]["total"].get("failed", 0)
self.ingest_pipeline_cluster_count += ingest_pipeline_node_count
self.ingest_pipeline_cluster_time += ingest_pipeline_node_time
self.ingest_pipeline_cluster_failed += ingest_pipeline_node_failed
self.metrics_store.put_value_node_level(node_name, "ingest_pipeline_node_count", ingest_pipeline_node_count, meta_data=metadata)
self.metrics_store.put_value_node_level(node_name, "ingest_pipeline_node_time", ingest_pipeline_node_time, "ms", meta_data=metadata)
self.metrics_store.put_value_node_level(node_name, "ingest_pipeline_node_failed", ingest_pipeline_node_failed, meta_data=metadata)
def _record_pipeline_level_processor_stats(self, pipeline, pipeline_name, cluster_name, node_name):
for processor_name, processor_stats in pipeline.items():
start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"].get(pipeline_name, {})
start_stats_processors.setdefault(processor_name, {})
# We have an individual processor obj, which contains the stats for each individual processor
if processor_name != "total":
metadata = {
"processor_name": processor_name,
"type": processor_stats.get("type", None),
"pipeline_name": pipeline_name,
"cluster_name": cluster_name,
}
start_count = start_stats_processors[processor_name].get("stats", {}).get("count", 0)
end_count = processor_stats.get("stats", {}).get("count", 0)
ingest_pipeline_processor_count = end_count - start_count
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_count", ingest_pipeline_processor_count, meta_data=metadata
)
start_time = start_stats_processors[processor_name].get("stats", {}).get("time_in_millis", 0)
end_time = processor_stats.get("stats", {}).get("time_in_millis", 0)
ingest_pipeline_processor_time = end_time - start_time
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_time", ingest_pipeline_processor_time, unit="ms", meta_data=metadata
)
start_failed = start_stats_processors[processor_name].get("stats", {}).get("failed", 0)
end_failed = processor_stats.get("stats", {}).get("failed", 0)
ingest_pipeline_processor_failed = end_failed - start_failed
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_failed", ingest_pipeline_processor_failed, meta_data=metadata
)
# We have a top level pipeline stats obj, which contains the total time spent preprocessing documents in
# the ingest pipeline.
elif processor_name == "total":
metadata = {"pipeline_name": pipeline_name, "cluster_name": cluster_name}
start_count = start_stats_processors[processor_name].get("count", 0)
end_count = processor_stats.get("count", 0)
ingest_pipeline_pipeline_count = end_count - start_count
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_count", ingest_pipeline_pipeline_count, meta_data=metadata
)
start_time = start_stats_processors[processor_name].get("time_in_millis", 0)
end_time = processor_stats.get("time_in_millis", 0)
ingest_pipeline_pipeline_time = end_time - start_time
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_time", ingest_pipeline_pipeline_time, unit="ms", meta_data=metadata
)
start_failed = start_stats_processors[processor_name].get("failed", 0)
end_failed = processor_stats.get("failed", 0)
ingest_pipeline_pipeline_failed = end_failed - start_failed
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_failed", ingest_pipeline_pipeline_failed, meta_data=metadata
)
def get_ingest_pipeline_stats(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
summaries = {}
for cluster_name in self.specified_cluster_names:
try:
ingest_stats = self.clients[cluster_name].nodes.stats(metric="ingest")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting Ingest Pipeline stats on cluster [{cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
summaries[ingest_stats["cluster_name"]] = self._parse_ingest_pipelines(ingest_stats)
return summaries
def _parse_ingest_pipelines(self, ingest_stats):
parsed_stats = {}
for node in ingest_stats["nodes"].values():
parsed_stats[node["name"]] = {}
parsed_stats[node["name"]]["total"] = node["ingest"]["total"]
parsed_stats[node["name"]]["pipelines"] = {}
for pipeline_name, pipeline_stats in node["ingest"]["pipelines"].items():
parsed_stats[node["name"]]["pipelines"][pipeline_name] = {}
parsed_stats[node["name"]]["pipelines"][pipeline_name]["total"] = {
key: pipeline_stats[key] for key in pipeline_stats if key != "processors"
}
# There may be multiple processors of the same name/type in a single pipeline, so let's just
# label them 1-N
suffix = 1
for processor in pipeline_stats["processors"]:
for processor_name, processor_stats in processor.items():
processor_name = f"{str(processor_name)}_{suffix}"
suffix += 1
parsed_stats[node["name"]]["pipelines"][pipeline_name][processor_name] = processor_stats
return parsed_stats
class StartupTime(InternalTelemetryDevice):
def __init__(self, stopwatch=time.StopWatch):
super().__init__()
self.timer = stopwatch()
def on_pre_node_start(self, node_name):
self.timer.start()
def attach_to_node(self, node):
self.timer.stop()
def store_system_metrics(self, node, metrics_store):
metrics_store.put_value_node_level(node.node_name, "node_startup_time", self.timer.total_time(), "s")
class DiskIo(InternalTelemetryDevice):
"""
Gathers disk I/O stats.
"""
def __init__(self, node_count_on_host):
super().__init__()
self.node_count_on_host = node_count_on_host
self.read_bytes = None
self.write_bytes = None
def attach_to_node(self, node):
es_process = sysstats.setup_process_stats(node.pid)
process_start = sysstats.process_io_counters(es_process)
if process_start:
self.read_bytes = process_start.read_bytes
self.write_bytes = process_start.write_bytes
self.logger.info("Using more accurate process-based I/O counters.")
else:
# noinspection PyBroadException
try:
disk_start = sysstats.disk_io_counters()
self.read_bytes = disk_start.read_bytes
self.write_bytes = disk_start.write_bytes
self.logger.warning(
"Process I/O counters are not supported on this platform. Falling back to less accurate disk I/O counters."
)
except BaseException:
self.logger.exception("Could not determine I/O stats at benchmark start.")
def detach_from_node(self, node, running):
if running:
# Be aware the semantics of write counts etc. are different for disk and process statistics.
# Thus we're conservative and only report I/O bytes now.
# noinspection PyBroadException
try:
es_process = sysstats.setup_process_stats(node.pid)
process_end = sysstats.process_io_counters(es_process)
# we have process-based disk counters, no need to worry how many nodes are on this host
if process_end:
self.read_bytes = process_end.read_bytes - self.read_bytes
self.write_bytes = process_end.write_bytes - self.write_bytes
else:
disk_end = sysstats.disk_io_counters()
if self.node_count_on_host > 1:
self.logger.info(
"There are [%d] nodes on this host and Rally fell back to disk I/O counters. "
"Attributing [1/%d] of total I/O to [%s].",
self.node_count_on_host,
self.node_count_on_host,
node.node_name,
)
self.read_bytes = (disk_end.read_bytes - self.read_bytes) // self.node_count_on_host
self.write_bytes = (disk_end.write_bytes - self.write_bytes) // self.node_count_on_host
# Catching RuntimeException is not sufficient: psutil might raise AccessDenied (derived from Exception)
except BaseException:
self.logger.exception("Could not determine I/O stats at benchmark end.")
# reset all counters so we don't attempt to write inconsistent numbers to the metrics store later on
self.read_bytes = None
self.write_bytes = None
def store_system_metrics(self, node, metrics_store):
if self.write_bytes is not None:
metrics_store.put_value_node_level(node.node_name, "disk_io_write_bytes", self.write_bytes, "byte")
if self.read_bytes is not None:
metrics_store.put_value_node_level(node.node_name, "disk_io_read_bytes", self.read_bytes, "byte")
def store_node_attribute_metadata(metrics_store, nodes_info):
# push up all node level attributes to cluster level iff the values are identical for all nodes
pseudo_cluster_attributes = {}
for node in nodes_info:
if "attributes" in node:
for k, v in node["attributes"].items():
attribute_key = "attribute_%s" % str(k)
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], attribute_key, v)
if attribute_key not in pseudo_cluster_attributes:
pseudo_cluster_attributes[attribute_key] = set()
pseudo_cluster_attributes[attribute_key].add(v)
for k, v in pseudo_cluster_attributes.items():
if len(v) == 1:
metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, k, next(iter(v)))
def store_plugin_metadata(metrics_store, nodes_info):
# push up all plugins to cluster level iff all nodes have the same ones
all_nodes_plugins = []
all_same = False
for node in nodes_info:
plugins = [p["name"] for p in extract_value(node, ["plugins"], fallback=[]) if "name" in p]
if not all_nodes_plugins:
all_nodes_plugins = plugins.copy()
all_same = True
else:
# order does not matter so we do a set comparison
all_same = all_same and set(all_nodes_plugins) == set(plugins)
if plugins:
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], "plugins", plugins)
if all_same and all_nodes_plugins:
metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "plugins", all_nodes_plugins)
def extract_value(node, path, fallback="unknown"):
value = node
try:
for k in path:
value = value[k]
except KeyError:
value = fallback
return value
def flatten_stats_fields(prefix=None, stats=None):
"""
Flatten provided dict using an optional prefix and top level key filters.
:param prefix: The prefix for all flattened values. Defaults to None.
:param stats: Dict with values to be flattened, using _ as a separator. Defaults to {}.
:return: Return flattened dictionary, separated by _ and prefixed with prefix.
"""
def iterate():
for section_name, section_value in stats.items():
if isinstance(section_value, dict):
new_prefix = f"{prefix}_{section_name}"
# https://www.python.org/dev/peps/pep-0380/
yield from flatten_stats_fields(prefix=new_prefix, stats=section_value).items()
# Avoid duplication for metric fields that have unit embedded in value as they are also recorded elsewhere
# example: `breakers_parent_limit_size_in_bytes` vs `breakers_parent_limit_size`
elif isinstance(section_value, (int, float)) and not isinstance(section_value, bool):
yield "{}{}".format(prefix + "_" if prefix else "", section_name), section_value
if stats:
return dict(iterate())
else:
return {}
class ClusterEnvironmentInfo(InternalTelemetryDevice):
"""
Gathers static environment information on a cluster level (e.g. version numbers).
"""
serverless_status = serverless.Status.Public
def __init__(self, client, metrics_store, revision_override):
super().__init__()
self.metrics_store = metrics_store
self.client = client
self.revision_override = revision_override
def on_benchmark_start(self):
# noinspection PyBroadException
try:
client_info = self.client.info()
except BaseException:
self.logger.exception("Could not retrieve cluster version info")
return
distribution_flavor = client_info["version"].get("build_flavor", "oss")
# serverless returns dummy build hash which gets overridden when running with operator privileges
revision = client_info["version"].get("build_hash", distribution_flavor)
if self.revision_override:
revision = self.revision_override
# if version number is not available default to build flavor
distribution_version = client_info["version"].get("number", distribution_flavor)
# overwrite static serverless version number
if versions.is_serverless(distribution_flavor):
distribution_version = "serverless"
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_version", distribution_version)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_flavor", distribution_flavor)
def add_metadata_for_node(metrics_store, node_name, host_name):
"""
Gathers static environment information like OS or CPU details for Rally-provisioned nodes.
"""
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_name", sysstats.os_name())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_version", sysstats.os_version())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_logical_cores", sysstats.logical_cpu_cores())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_physical_cores", sysstats.physical_cpu_cores())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_model", sysstats.cpu_model())
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name)
metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host_name)
class ExternalEnvironmentInfo(InternalTelemetryDevice):
"""
Gathers static environment information for externally provisioned clusters.
"""
serverless_status = serverless.Status.Internal
def __init__(self, client, metrics_store):
super().__init__()
self.metrics_store = metrics_store
self.client = client
# noinspection PyBroadException
def on_benchmark_start(self):
try:
nodes_stats = self.client.nodes.stats(metric="_all")["nodes"].values()
except BaseException:
self.logger.exception("Could not retrieve nodes stats")
nodes_stats = []
try:
nodes_info = self.client.nodes.info(node_id="_all")["nodes"].values()
except BaseException:
self.logger.exception("Could not retrieve nodes info")
nodes_info = []
for node in nodes_stats:
node_name = node["name"]
host = node.get("host", "unknown")
self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host)
for node in nodes_info:
node_name = node["name"]
self.store_node_info(node_name, "os_name", node, ["os", "name"])
self.store_node_info(node_name, "os_version", node, ["os", "version"])
self.store_node_info(node_name, "cpu_logical_cores", node, ["os", "available_processors"])
self.store_node_info(node_name, "jvm_vendor", node, ["jvm", "vm_vendor"])
self.store_node_info(node_name, "jvm_version", node, ["jvm", "version"])
store_plugin_metadata(self.metrics_store, nodes_info)
store_node_attribute_metadata(self.metrics_store, nodes_info)
def store_node_info(self, node_name, metric_key, node, path):
self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, metric_key, extract_value(node, path))
class JvmStatsSummary(InternalTelemetryDevice):
"""
Gathers a summary of various JVM statistics during the whole race.
"""
serverless_status = serverless.Status.Internal
def __init__(self, client, metrics_store):
super().__init__()
self.metrics_store = metrics_store
self.client = client
self.jvm_stats_per_node = {}
def on_benchmark_start(self):
self.logger.info("JvmStatsSummary on benchmark start")
self.jvm_stats_per_node = self.jvm_stats()
def on_benchmark_stop(self):
jvm_stats_at_end = self.jvm_stats()
total_collection_time = collections.defaultdict(int)
total_collection_count = collections.defaultdict(int)
for node_name, jvm_stats_end in jvm_stats_at_end.items():
if node_name in self.jvm_stats_per_node:
jvm_stats_start = self.jvm_stats_per_node[node_name]
collector_stats_start = jvm_stats_start["collectors"]
collector_stats_end = jvm_stats_end["collectors"]
for collector_name in collector_stats_start:
gc_time_diff = max(collector_stats_end[collector_name]["gc_time"] - collector_stats_start[collector_name]["gc_time"], 0)
gc_count_diff = max(
collector_stats_end[collector_name]["gc_count"] - collector_stats_start[collector_name]["gc_count"], 0
)
total_collection_time[collector_name] += gc_time_diff
total_collection_count[collector_name] += gc_count_diff
self.metrics_store.put_value_node_level(node_name, f"node_{collector_name}_gc_time", gc_time_diff, "ms")
self.metrics_store.put_value_node_level(node_name, f"node_{collector_name}_gc_count", gc_count_diff)
all_pool_stats = {"name": "jvm_memory_pool_stats"}
for pool_name, pool_stats in jvm_stats_end["pools"].items():
all_pool_stats[pool_name] = {"peak_usage": pool_stats["peak"], "unit": "byte"}
self.metrics_store.put_doc(all_pool_stats, level=MetaInfoScope.node, node_name=node_name)
else:
self.logger.warning("Cannot determine JVM stats for [%s] (not in the cluster at the start of the benchmark).", node_name)
for collector_name, value in total_collection_time.items():
self.metrics_store.put_value_cluster_level(f"node_total_{collector_name}_gc_time", value, "ms")
for collector_name, value in total_collection_count.items():
self.metrics_store.put_value_cluster_level(f"node_total_{collector_name}_gc_count", value)
self.jvm_stats_per_node = None
def jvm_stats(self):
self.logger.debug("Gathering JVM stats")
jvm_stats = {}
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.nodes.stats(metric="jvm")
except elasticsearch.TransportError:
self.logger.exception("Could not retrieve GC times.")
return jvm_stats
nodes = stats["nodes"]
for node in nodes.values():
node_name = node["name"]
gc = node["jvm"]["gc"]["collectors"]
jvm_stats[node_name] = {
"pools": {},
"collectors": {},
}
for collector_name, collector_stats in gc.items():
collection_time = collector_stats.get("collection_time_in_millis", 0)
collection_count = collector_stats.get("collection_count", 0)
if collector_name in ("young", "old"):
metric_prefix = f"{collector_name}_gen"
else:
metric_prefix = collector_name.lower().replace(" ", "_")
jvm_stats[node_name]["collectors"][metric_prefix] = {
"gc_time": collection_time,
"gc_count": collection_count,
}
pool_usage = node["jvm"]["mem"]["pools"]
for pool_name, pool_stats in pool_usage.items():
jvm_stats[node_name]["pools"][pool_name] = {"peak": pool_stats["peak_used_in_bytes"]}
return jvm_stats
class IndexStats(InternalTelemetryDevice):
"""
Gathers statistics via the Elasticsearch index stats API
"""
serverless_status = serverless.Status.Internal
def __init__(self, client, metrics_store):
super().__init__()
self.client = client
self.metrics_store = metrics_store
self.first_time = True
def on_benchmark_start(self):
# we only determine this value at the start of the benchmark. This is actually only useful for
# the pipeline "benchmark-only" where we don't have control over the cluster and the user might not have restarted
# the cluster so we can at least tell them.
# Adding a small threshold for the warning to allow for indexing of internal indices
threshold = 2000
if self.first_time:
for t in self.index_times(self.index_stats(), per_shard_stats=False):
n = t["name"]
v = t["value"]
if t["value"] > threshold:
console.warn(
"%s is %d ms indicating that the cluster is not in a defined clean state. Recorded index time "
"metrics may be misleading." % (n, v),
logger=self.logger,
)
self.first_time = False
def on_benchmark_stop(self):
self.logger.info("Gathering index stats for all primaries on benchmark stop.")
index_stats = self.index_stats()
# import json
# self.logger.debug("Returned index stats:\n%s", json.dumps(index_stats, indent=2))
if "_all" not in index_stats or "primaries" not in index_stats["_all"]:
return
p = index_stats["_all"]["primaries"]
# actually this is add_count
self.add_metrics(self.extract_value(p, ["segments", "count"]), "segments_count")
self.add_metrics(self.extract_value(p, ["segments", "memory_in_bytes"]), "segments_memory_in_bytes", "byte")
for t in self.index_times(index_stats):
self.metrics_store.put_doc(doc=t, level=metrics.MetaInfoScope.cluster)
for ct in self.index_counts(index_stats):
self.metrics_store.put_doc(doc=ct, level=metrics.MetaInfoScope.cluster)
self.add_metrics(self.extract_value(p, ["segments", "doc_values_memory_in_bytes"]), "segments_doc_values_memory_in_bytes", "byte")
self.add_metrics(
self.extract_value(p, ["segments", "stored_fields_memory_in_bytes"]), "segments_stored_fields_memory_in_bytes", "byte"
)
self.add_metrics(self.extract_value(p, ["segments", "terms_memory_in_bytes"]), "segments_terms_memory_in_bytes", "byte")
self.add_metrics(self.extract_value(p, ["segments", "norms_memory_in_bytes"]), "segments_norms_memory_in_bytes", "byte")
self.add_metrics(self.extract_value(p, ["segments", "points_memory_in_bytes"]), "segments_points_memory_in_bytes", "byte")
self.add_metrics(
self.extract_value(index_stats, ["_all", "total", "store", "total_data_set_size_in_bytes"]), "dataset_size_in_bytes", "byte"
)
self.add_metrics(self.extract_value(index_stats, ["_all", "total", "store", "size_in_bytes"]), "store_size_in_bytes", "byte")
self.add_metrics(self.extract_value(index_stats, ["_all", "total", "translog", "size_in_bytes"]), "translog_size_in_bytes", "byte")
def index_stats(self):
# noinspection PyBroadException
try:
return self.client.indices.stats(metric="_all", level="shards")
except BaseException:
self.logger.exception("Could not retrieve index stats.")
return {}
def index_times(self, stats, per_shard_stats=True):
times = []
self.index_time(times, stats, "merges_total_time", ["merges", "total_time_in_millis"], per_shard_stats)
self.index_time(times, stats, "merges_total_throttled_time", ["merges", "total_throttled_time_in_millis"], per_shard_stats)
self.index_time(times, stats, "indexing_total_time", ["indexing", "index_time_in_millis"], per_shard_stats)
self.index_time(times, stats, "indexing_throttle_time", ["indexing", "throttle_time_in_millis"], per_shard_stats)
self.index_time(times, stats, "refresh_total_time", ["refresh", "total_time_in_millis"], per_shard_stats)
self.index_time(times, stats, "flush_total_time", ["flush", "total_time_in_millis"], per_shard_stats)
return times
def index_time(self, values, stats, name, path, per_shard_stats):
primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={})
value = self.extract_value(primary_total_stats, path)
if value is not None:
doc = {
"name": name,
"value": value,
"unit": "ms",
}
if per_shard_stats:
doc["per-shard"] = self.primary_shard_stats(stats, path)
values.append(doc)
def index_counts(self, stats):
counts = []
self.index_count(counts, stats, "merges_total_count", ["merges", "total"])
self.index_count(counts, stats, "refresh_total_count", ["refresh", "total"])
self.index_count(counts, stats, "flush_total_count", ["flush", "total"])
return counts
def index_count(self, values, stats, name, path):
primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={})
value = self.extract_value(primary_total_stats, path)
if value is not None:
doc = {"name": name, "value": value}
values.append(doc)
def primary_shard_stats(self, stats, path):
shard_stats = []
try:
for shards in stats["indices"].values():
for shard in shards["shards"].values():
for shard_metrics in shard:
if shard_metrics["routing"]["primary"]:
shard_stats.append(self.extract_value(shard_metrics, path, default_value=0))
except KeyError:
self.logger.warning("Could not determine primary shard stats at path [%s].", ",".join(path))
return shard_stats
def add_metrics(self, value, metric_key, unit=None):
if value is not None:
if unit:
self.metrics_store.put_value_cluster_level(metric_key, value, unit)
else:
self.metrics_store.put_value_cluster_level(metric_key, value)
def extract_value(self, primaries, path, default_value=None):
value = primaries
try:
for k in path:
value = value[k]
return value
except KeyError:
self.logger.warning("Could not determine value at path [%s]. Returning default value [%s]", ",".join(path), str(default_value))
return default_value
class MlBucketProcessingTime(InternalTelemetryDevice):
serverless_status = serverless.Status.Public
def __init__(self, client, metrics_store):
super().__init__()
self.client = client
self.metrics_store = metrics_store
def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
results = self.client.search(
index=".ml-anomalies-*",
body={
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {"result_type": "bucket"},
},
],
},
},
"aggs": {
"jobs": {
"terms": {
"field": "job_id",
},
"aggs": {
"min_pt": {
"min": {"field": "processing_time_ms"},
},
"max_pt": {
"max": {"field": "processing_time_ms"},
},
"mean_pt": {
"avg": {"field": "processing_time_ms"},
},
"median_pt": {
"percentiles": {"field": "processing_time_ms", "percents": [50]},
},
},
}
},
},
)
except elasticsearch.TransportError:
self.logger.exception("Could not retrieve ML bucket processing time.")
return
try:
for job in results["aggregations"]["jobs"]["buckets"]:
ml_job_stats = collections.OrderedDict()
ml_job_stats["name"] = "ml_processing_time"
ml_job_stats["job"] = job["key"]
ml_job_stats["min"] = job["min_pt"]["value"]
ml_job_stats["mean"] = job["mean_pt"]["value"]
ml_job_stats["median"] = job["median_pt"]["values"]["50.0"]
ml_job_stats["max"] = job["max_pt"]["value"]
ml_job_stats["unit"] = "ms"
self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster)
except KeyError:
# no ML running
pass
class IndexSize(InternalTelemetryDevice):
"""
Measures the final size of the index
"""
def __init__(self, data_paths):
super().__init__()
self.data_paths = data_paths
self.attached = False
self.index_size_bytes = None
def attach_to_node(self, node):
self.attached = True
def detach_from_node(self, node, running):
# we need to gather the file size after the node has terminated so we can be sure that it has written all its buffers.
if not running and self.attached and self.data_paths:
self.attached = False
index_size_bytes = 0
for data_path in self.data_paths:
index_size_bytes += io.get_size(data_path)
self.index_size_bytes = index_size_bytes
def store_system_metrics(self, node, metrics_store):
if self.index_size_bytes:
metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte")
class MasterNodeStats(InternalTelemetryDevice):
"""
Collects and pushes the current master node name to the metric store.
"""
serverless_status = serverless.Status.Internal
command = "master-node-stats"
human_name = "Master Node Stats"
help = "Regularly samples master node name"
def __init__(self, telemetry_params, client, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``master-node-stats-sample-interval``: An integer controlling the interval, in seconds,
between collecting samples. Default: 30s.
:param client: The default Elasticsearch client
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.client = client
self.sample_interval = telemetry_params.get("master-node-stats-sample-interval", 30)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'master-node-stats-sample-interval' must be greater than zero " f"but was {self.sample_interval}."
)
self.metrics_store = metrics_store
self.sampler = None
def on_benchmark_start(self):
recorder = MasterNodeStatsRecorder(
self.client,
self.metrics_store,
self.sample_interval,
)
self.sampler = SamplerThread(recorder)
self.sampler.daemon = True
# we don't require starting recorders precisely at the same time
self.sampler.start()
def on_benchmark_stop(self):
if self.sampler:
self.sampler.finish()
class MasterNodeStatsRecorder:
"""
Collects and pushes the current master node name for the specified cluster to the metric store.
"""
def __init__(self, client, metrics_store, sample_interval):
"""
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: An integer controlling the interval, in seconds, between collecting samples.
"""
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)
def __str__(self):
return "master node stats"
def record(self):
"""
Collect master node name and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
state = self.client.cluster.state(metric="master_node")
info = self.client.nodes.info(node_id=state["master_node"], metric="os")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting master node stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
doc = {
"name": "master-node-stats",
"node": info["nodes"][state["master_node"]]["name"],
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster)
class DiskUsageStats(TelemetryDevice):
"""
Measures the space taken by each field
"""
internal = False
serverless_status = serverless.Status.Internal
command = "disk-usage-stats"
human_name = "Disk usage of each field"
help = "Runs the indices disk usage API after benchmarking"
def __init__(self, telemetry_params, client, metrics_store, index_names, data_stream_names):
"""
:param telemetry_params: The configuration object for telemetry_params.
May specify:
``disk-usage-stats-indices``: Comma separated list of indices who's disk
usage to fetch. Default is all indices in the track.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param index_names: Names of indices defined by this track
:param data_stream_names: Names of data streams defined by this track
"""
super().__init__()
self.telemetry_params = telemetry_params
self.client = client
self.metrics_store = metrics_store
self.index_names = index_names
self.data_stream_names = data_stream_names
def on_benchmark_start(self):
self.indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names + self.data_stream_names))
if not self.indices:
msg = (
"No indices defined for disk-usage-stats. Set disk-usage-stats-indices "
"telemetry param or add indices or data streams to the track config."
)
self.logger.exception(msg)
raise exceptions.RallyError(msg)
if isinstance(self.indices, list):
self.indices = ",".join(self.indices)
def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
found = False
for index in self.indices.split(","):
self.logger.debug("Gathering disk usage for [%s]", index)
try:
response = self.client.indices.disk_usage(index=index, run_expensive_tasks=True)
except elasticsearch.RequestError:
msg = f"A transport error occurred while collecting disk usage for {index}"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
except elasticsearch.NotFoundError:
msg = f"Requested disk usage for missing index {index}"
self.logger.warning(msg)
continue
found = True
self.handle_telemetry_usage(response)
if not found:
msg = f"Couldn't find any indices for disk usage {self.indices}"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
def handle_telemetry_usage(self, response):
if response["_shards"]["failed"] > 0:
failures = str(response["_shards"]["failures"])
msg = f"Shards failed when fetching disk usage: {failures}"
self.logger.exception(msg)
raise exceptions.RallyError(msg)
for index, idx_fields in response.items():
if index == "_shards":
continue
for field, field_info in idx_fields["fields"].items():
meta = {"index": index, "field": field}
self.metrics_store.put_value_cluster_level("disk_usage_total", field_info["total_in_bytes"], meta_data=meta, unit="byte")
inverted_index = field_info.get("inverted_index", {"total_in_bytes": 0})["total_in_bytes"]
if inverted_index > 0:
self.metrics_store.put_value_cluster_level("disk_usage_inverted_index", inverted_index, meta_data=meta, unit="byte")
stored_fields = field_info.get("stored_fields_in_bytes", 0)
if stored_fields > 0:
self.metrics_store.put_value_cluster_level("disk_usage_stored_fields", stored_fields, meta_data=meta, unit="byte")
doc_values = field_info.get("doc_values_in_bytes", 0)
if doc_values > 0:
self.metrics_store.put_value_cluster_level("disk_usage_doc_values", doc_values, meta_data=meta, unit="byte")
points = field_info.get("points_in_bytes", 0)
if points > 0:
self.metrics_store.put_value_cluster_level("disk_usage_points", points, meta_data=meta, unit="byte")
norms = field_info.get("norms_in_bytes", 0)
if norms > 0:
self.metrics_store.put_value_cluster_level("disk_usage_norms", norms, meta_data=meta, unit="byte")
term_vectors = field_info.get("term_vectors_in_bytes", 0)
if term_vectors > 0:
self.metrics_store.put_value_cluster_level("disk_usage_term_vectors", term_vectors, meta_data=meta, unit="byte")
knn_vectors = field_info.get("knn_vectors_in_bytes", 0)
if knn_vectors > 0:
self.metrics_store.put_value_cluster_level("disk_usage_knn_vectors", knn_vectors, meta_data=meta, unit="byte")
class BlobStoreStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "blob-store-stats"
human_name = "Blob Store Stats"
help = "Regularly samples blob store stats, only applicable to serverless Elasticsearch"
"""
Gathers blob snapshots stats on both a cluster and node level
"""
def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
``blob-store-stats-sample-interval``: positive integer controlling the sampling interval.
Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()
self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("blob-store-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'blob-store-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.specified_cluster_names = self.clients.keys()
self.metrics_store = metrics_store
self.samplers = []
def __str__(self):
return "blob-store-stats"
def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
if not self.clients[cluster_name].is_serverless:
self.logger.warning(
"Cannot attach telemetry device [%s] to cluster [%s], [%s] is only supported with serverless Elasticsearch",
self,
cluster_name,
self,
)
continue
self.logger.debug("Gathering [%s] for [%s]", self, cluster_name)
recorder = BlobStoreStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()
def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()
class BlobStoreStatsRecorder:
"""
Collects and pushes blob store stats for the specified cluster to the metric store.
"""
def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. This may differ
from the actual cluster name deployed.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
"""
self.rally_cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)
def __str__(self):
return "blob-store-stats"
def record(self):
"""
Collect blob store stats at a per cluster and node level and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats_api_endpoint = "/_internal/blob_store/stats"
stats = self.client.perform_request(method="GET", path=stats_api_endpoint, params={})
except elasticsearch.ApiError as e:
msg = f"An API error [{e}] occurred while collecting [{self}] on cluster [{self.rally_cluster_name}]"
self.logger.error(msg)
return
except elasticsearch.TransportError as e:
msg = f"A transport error [{e}] occurred while collecting [{self}] on cluster [{self.rally_cluster_name}]"
self.logger.error(msg)
return
self._push_stats(stats)
def _push_stats(self, stats):
stats_meta_data = {key: value for key, value in stats.items() if key == "_nodes"}
meta_data = {"cluster": stats.get("cluster_name", self.rally_cluster_name), **stats_meta_data}
if cluster_stats := self._get_stats(stats, "_all"):
self.metrics_store.put_doc(cluster_stats, level=MetaInfoScope.cluster, meta_data=meta_data)
for node_id in stats.get("nodes", {}):
if ns := self._get_stats(stats.get("nodes", {}), node_id):
self.metrics_store.put_doc(ns, level=MetaInfoScope.node, node_name=node_id, meta_data=meta_data)
def _get_stats(self, stats, node):
doc = collections.OrderedDict()
obj_stats = self.object_store_stats(stats.get(node, {}))
obs_stats = self.operational_backup_service_stats(stats.get(node, {}))
if obj_stats or obs_stats:
doc["name"] = "blob-store-stats"
doc["node"] = node
doc.update(obj_stats)
doc.update(obs_stats)
return doc
def object_store_stats(self, stats):
return flatten_stats_fields(prefix="object_store", stats=stats.get("object_store_stats", {}))
def operational_backup_service_stats(self, stats):
return flatten_stats_fields(prefix="operational_backup", stats=stats.get("operational_backup_service_stats", {}))
class GeoIpStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "geoip-stats"
human_name = "GeoIp Stats"
help = "Writes geo ip stats to the metrics store at the end of the benchmark."
def __init__(self, client, metrics_store):
super().__init__()
self.client = client
self.metrics_store = metrics_store
def on_benchmark_stop(self):
self.logger.info("Gathering GeoIp stats at benchmark end")
# First, build a map of node id to node name, because the geoip stats API doesn't return node name:
try:
nodes_info = self.client.nodes.info(node_id="_all")["nodes"].items()
except BaseException:
self.logger.exception("Could not retrieve nodes info")
nodes_info = {}
node_id_to_name_dict = {}
for node_id, node in nodes_info:
node_id_to_name_dict[node_id] = node["name"]
geoip_stats = self.client.ingest.geo_ip_stats()
stats_dict = geoip_stats.body
nodes_dict = stats_dict["nodes"]
for node_id, node in nodes_dict.items():
node_name = node_id_to_name_dict[node_id]
cache_stats = node["cache_stats"]
self.metrics_store.put_value_node_level(node_name, "geoip_cache_count", cache_stats["count"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_hits", cache_stats["hits"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_misses", cache_stats["misses"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_evictions", cache_stats["evictions"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_hits_time_in_millis", cache_stats["hits_time_in_millis"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_misses_time_in_millis", cache_stats["misses_time_in_millis"])