def calculate_task_throughput()

in esrally/driver/driver.py [0:0]


    def calculate_task_throughput(self, task, current_samples, bucket_interval_secs):
        task_throughput = []

        if task not in self.task_stats:
            first_sample = current_samples[0]
            self.task_stats[task] = ThroughputCalculator.TaskStats(
                bucket_interval=bucket_interval_secs,
                sample_type=first_sample.sample_type,
                start_time=first_sample.absolute_time - first_sample.time_period,
            )
        current = self.task_stats[task]
        count = current.total_count
        last_sample = None
        for sample in current_samples:
            last_sample = sample
            # print("%d,%f,%f,%s,%s,%d,%f" %
            #       (sample.client_id, sample.absolute_time, sample.relative_time, sample.operation, sample.sample_type,
            #        sample.total_ops, sample.time_period), file=sample_log)

            # once we have seen a new sample type, we stick to it.
            current.maybe_update_sample_type(sample.sample_type)

            # we need to store the total count separately and cannot update `current.total_count` immediately here
            # because we would count all raw samples in `unprocessed` twice. Hence, we'll only update
            # `current.total_count` when we have calculated a new throughput sample.
            count += sample.total_ops
            current.update_interval(sample.absolute_time)

            if current.can_calculate_throughput():
                current.finish_bucket(count)
                task_throughput.append(
                    (
                        sample.absolute_time,
                        sample.relative_time,
                        current.sample_type,
                        current.throughput,
                        # we calculate throughput per second
                        f"{sample.total_ops_unit}/s",
                    )
                )
            else:
                current.unprocessed.append(sample)

        # also include the last sample if we don't have one for the current sample type, even if it is below the bucket
        # interval (mainly needed to ensure we show throughput data in test mode)
        if last_sample is not None and current.can_add_final_throughput_sample():
            current.finish_bucket(count)
            task_throughput.append(
                (
                    last_sample.absolute_time,
                    last_sample.relative_time,
                    current.sample_type,
                    current.throughput,
                    f"{last_sample.total_ops_unit}/s",
                )
            )

        return task_throughput