def on_benchmark_stop()

in esrally/telemetry.py [0:0]


    def on_benchmark_stop(self):
        # pylint: disable=import-outside-toplevel
        import elasticsearch

        try:
            results = self.client.search(
                index=".ml-anomalies-*",
                body={
                    "size": 0,
                    "query": {
                        "bool": {
                            "must": [
                                {
                                    "term": {"result_type": "bucket"},
                                },
                            ],
                        },
                    },
                    "aggs": {
                        "jobs": {
                            "terms": {
                                "field": "job_id",
                            },
                            "aggs": {
                                "min_pt": {
                                    "min": {"field": "processing_time_ms"},
                                },
                                "max_pt": {
                                    "max": {"field": "processing_time_ms"},
                                },
                                "mean_pt": {
                                    "avg": {"field": "processing_time_ms"},
                                },
                                "median_pt": {
                                    "percentiles": {"field": "processing_time_ms", "percents": [50]},
                                },
                            },
                        }
                    },
                },
            )
        except elasticsearch.TransportError:
            self.logger.exception("Could not retrieve ML bucket processing time.")
            return
        try:
            for job in results["aggregations"]["jobs"]["buckets"]:
                ml_job_stats = collections.OrderedDict()
                ml_job_stats["name"] = "ml_processing_time"
                ml_job_stats["job"] = job["key"]
                ml_job_stats["min"] = job["min_pt"]["value"]
                ml_job_stats["mean"] = job["mean_pt"]["value"]
                ml_job_stats["median"] = job["median_pt"]["values"]["50.0"]
                ml_job_stats["max"] = job["max_pt"]["value"]
                ml_job_stats["unit"] = "ms"
                self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster)
        except KeyError:
            # no ML running
            pass