in esrally/telemetry.py [0:0]
def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
results = self.client.search(
index=".ml-anomalies-*",
body={
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {"result_type": "bucket"},
},
],
},
},
"aggs": {
"jobs": {
"terms": {
"field": "job_id",
},
"aggs": {
"min_pt": {
"min": {"field": "processing_time_ms"},
},
"max_pt": {
"max": {"field": "processing_time_ms"},
},
"mean_pt": {
"avg": {"field": "processing_time_ms"},
},
"median_pt": {
"percentiles": {"field": "processing_time_ms", "percents": [50]},
},
},
}
},
},
)
except elasticsearch.TransportError:
self.logger.exception("Could not retrieve ML bucket processing time.")
return
try:
for job in results["aggregations"]["jobs"]["buckets"]:
ml_job_stats = collections.OrderedDict()
ml_job_stats["name"] = "ml_processing_time"
ml_job_stats["job"] = job["key"]
ml_job_stats["min"] = job["min_pt"]["value"]
ml_job_stats["mean"] = job["mean_pt"]["value"]
ml_job_stats["median"] = job["median_pt"]["values"]["50.0"]
ml_job_stats["max"] = job["max_pt"]["value"]
ml_job_stats["unit"] = "ms"
self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster)
except KeyError:
# no ML running
pass