def _record_pipeline_level_processor_stats()

in esrally/telemetry.py [0:0]


    def _record_pipeline_level_processor_stats(self, pipeline, pipeline_name, cluster_name, node_name):
        for processor_name, processor_stats in pipeline.items():
            start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"].get(pipeline_name, {})
            start_stats_processors.setdefault(processor_name, {})

            # We have an individual processor obj, which contains the stats for each individual processor
            if processor_name != "total":
                metadata = {
                    "processor_name": processor_name,
                    "type": processor_stats.get("type", None),
                    "pipeline_name": pipeline_name,
                    "cluster_name": cluster_name,
                }

                start_count = start_stats_processors[processor_name].get("stats", {}).get("count", 0)
                end_count = processor_stats.get("stats", {}).get("count", 0)
                ingest_pipeline_processor_count = end_count - start_count

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_processor_count", ingest_pipeline_processor_count, meta_data=metadata
                )

                start_time = start_stats_processors[processor_name].get("stats", {}).get("time_in_millis", 0)
                end_time = processor_stats.get("stats", {}).get("time_in_millis", 0)
                ingest_pipeline_processor_time = end_time - start_time

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_processor_time", ingest_pipeline_processor_time, unit="ms", meta_data=metadata
                )

                start_failed = start_stats_processors[processor_name].get("stats", {}).get("failed", 0)
                end_failed = processor_stats.get("stats", {}).get("failed", 0)
                ingest_pipeline_processor_failed = end_failed - start_failed

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_processor_failed", ingest_pipeline_processor_failed, meta_data=metadata
                )

            # We have a top level pipeline stats obj, which contains the total time spent preprocessing documents in
            # the ingest pipeline.
            elif processor_name == "total":
                metadata = {"pipeline_name": pipeline_name, "cluster_name": cluster_name}

                start_count = start_stats_processors[processor_name].get("count", 0)
                end_count = processor_stats.get("count", 0)
                ingest_pipeline_pipeline_count = end_count - start_count

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_pipeline_count", ingest_pipeline_pipeline_count, meta_data=metadata
                )

                start_time = start_stats_processors[processor_name].get("time_in_millis", 0)
                end_time = processor_stats.get("time_in_millis", 0)
                ingest_pipeline_pipeline_time = end_time - start_time

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_pipeline_time", ingest_pipeline_pipeline_time, unit="ms", meta_data=metadata
                )

                start_failed = start_stats_processors[processor_name].get("failed", 0)
                end_failed = processor_stats.get("failed", 0)
                ingest_pipeline_pipeline_failed = end_failed - start_failed

                self.metrics_store.put_value_node_level(
                    node_name, "ingest_pipeline_pipeline_failed", ingest_pipeline_pipeline_failed, meta_data=metadata
                )