in esrally/telemetry.py [0:0]
def _record_pipeline_level_processor_stats(self, pipeline, pipeline_name, cluster_name, node_name):
for processor_name, processor_stats in pipeline.items():
start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"].get(pipeline_name, {})
start_stats_processors.setdefault(processor_name, {})
# We have an individual processor obj, which contains the stats for each individual processor
if processor_name != "total":
metadata = {
"processor_name": processor_name,
"type": processor_stats.get("type", None),
"pipeline_name": pipeline_name,
"cluster_name": cluster_name,
}
start_count = start_stats_processors[processor_name].get("stats", {}).get("count", 0)
end_count = processor_stats.get("stats", {}).get("count", 0)
ingest_pipeline_processor_count = end_count - start_count
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_count", ingest_pipeline_processor_count, meta_data=metadata
)
start_time = start_stats_processors[processor_name].get("stats", {}).get("time_in_millis", 0)
end_time = processor_stats.get("stats", {}).get("time_in_millis", 0)
ingest_pipeline_processor_time = end_time - start_time
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_time", ingest_pipeline_processor_time, unit="ms", meta_data=metadata
)
start_failed = start_stats_processors[processor_name].get("stats", {}).get("failed", 0)
end_failed = processor_stats.get("stats", {}).get("failed", 0)
ingest_pipeline_processor_failed = end_failed - start_failed
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_processor_failed", ingest_pipeline_processor_failed, meta_data=metadata
)
# We have a top level pipeline stats obj, which contains the total time spent preprocessing documents in
# the ingest pipeline.
elif processor_name == "total":
metadata = {"pipeline_name": pipeline_name, "cluster_name": cluster_name}
start_count = start_stats_processors[processor_name].get("count", 0)
end_count = processor_stats.get("count", 0)
ingest_pipeline_pipeline_count = end_count - start_count
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_count", ingest_pipeline_pipeline_count, meta_data=metadata
)
start_time = start_stats_processors[processor_name].get("time_in_millis", 0)
end_time = processor_stats.get("time_in_millis", 0)
ingest_pipeline_pipeline_time = end_time - start_time
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_time", ingest_pipeline_pipeline_time, unit="ms", meta_data=metadata
)
start_failed = start_stats_processors[processor_name].get("failed", 0)
end_failed = processor_stats.get("failed", 0)
ingest_pipeline_pipeline_failed = end_failed - start_failed
self.metrics_store.put_value_node_level(
node_name, "ingest_pipeline_pipeline_failed", ingest_pipeline_pipeline_failed, meta_data=metadata
)