def __call__()

in esrally/driver/driver.py [0:0]


    def __call__(self, raw_samples):
        if len(raw_samples) == 0:
            return
        total_start = time.perf_counter()
        start = total_start
        final_sample_count = 0
        for idx, sample in enumerate(raw_samples):
            if idx % self.downsample_factor == 0:
                final_sample_count += 1
                client_id_meta_data = {"client_id": sample.client_id}
                meta_data = self.merge(
                    self.track_meta_data,
                    self.challenge_meta_data,
                    sample.operation_meta_data,
                    sample.task.meta_data,
                    sample.request_meta_data,
                    client_id_meta_data,
                )

                self.metrics_store.put_value_cluster_level(
                    name="latency",
                    value=convert.seconds_to_ms(sample.latency),
                    unit="ms",
                    task=sample.task.name,
                    operation=sample.operation_name,
                    operation_type=sample.operation_type,
                    sample_type=sample.sample_type,
                    absolute_time=sample.absolute_time,
                    relative_time=sample.relative_time,
                    meta_data=meta_data,
                )

                self.metrics_store.put_value_cluster_level(
                    name="service_time",
                    value=convert.seconds_to_ms(sample.service_time),
                    unit="ms",
                    task=sample.task.name,
                    operation=sample.operation_name,
                    operation_type=sample.operation_type,
                    sample_type=sample.sample_type,
                    absolute_time=sample.absolute_time,
                    relative_time=sample.relative_time,
                    meta_data=meta_data,
                )

                self.metrics_store.put_value_cluster_level(
                    name="processing_time",
                    value=convert.seconds_to_ms(sample.processing_time),
                    unit="ms",
                    task=sample.task.name,
                    operation=sample.operation_name,
                    operation_type=sample.operation_type,
                    sample_type=sample.sample_type,
                    absolute_time=sample.absolute_time,
                    relative_time=sample.relative_time,
                    meta_data=meta_data,
                )

                for timing in sample.dependent_timings:
                    self.metrics_store.put_value_cluster_level(
                        name="service_time",
                        value=convert.seconds_to_ms(timing.service_time),
                        unit="ms",
                        task=timing.task.name,
                        operation=timing.operation_name,
                        operation_type=timing.operation_type,
                        sample_type=timing.sample_type,
                        absolute_time=timing.absolute_time,
                        relative_time=timing.relative_time,
                        meta_data=self.merge(timing.request_meta_data, client_id_meta_data),
                    )

        end = time.perf_counter()
        self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
        start = end
        aggregates = self.throughput_calculator.calculate(raw_samples)
        end = time.perf_counter()
        self.logger.debug("Calculating throughput took [%f] seconds.", (end - start))
        start = end
        for task, samples in aggregates.items():
            meta_data = self.merge(self.track_meta_data, self.challenge_meta_data, task.operation.meta_data, task.meta_data)
            for absolute_time, relative_time, sample_type, throughput, throughput_unit in samples:
                self.metrics_store.put_value_cluster_level(
                    name="throughput",
                    value=throughput,
                    unit=throughput_unit,
                    task=task.name,
                    operation=task.operation.name,
                    operation_type=task.operation.type,
                    sample_type=sample_type,
                    absolute_time=absolute_time,
                    relative_time=relative_time,
                    meta_data=meta_data,
                )
        end = time.perf_counter()
        self.logger.debug("Storing throughput took [%f] seconds.", (end - start))
        start = end
        # this will be a noop for the in-memory metrics store. If we use an ES metrics store however, this will ensure that we already send
        # the data and also clear the in-memory buffer. This allows users to see data already while running the benchmark. In cases where
        # it does not matter (i.e. in-memory) we will still defer this step until the end.
        #
        # Don't force refresh here in the interest of short processing times. We don't need to query immediately afterwards so there is
        # no need for frequent refreshes.
        self.metrics_store.flush(refresh=False)
        end = time.perf_counter()
        self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start))
        self.logger.debug(
            "Postprocessing [%d] raw samples (downsampled to [%d] samples) took [%f] seconds in total.",
            len(raw_samples),
            final_sample_count,
            (end - total_start),
        )