in esrally/driver/driver.py [0:0]
def calculate_task_throughput(self, task, current_samples, bucket_interval_secs):
task_throughput = []
if task not in self.task_stats:
first_sample = current_samples[0]
self.task_stats[task] = ThroughputCalculator.TaskStats(
bucket_interval=bucket_interval_secs,
sample_type=first_sample.sample_type,
start_time=first_sample.absolute_time - first_sample.time_period,
)
current = self.task_stats[task]
count = current.total_count
last_sample = None
for sample in current_samples:
last_sample = sample
# print("%d,%f,%f,%s,%s,%d,%f" %
# (sample.client_id, sample.absolute_time, sample.relative_time, sample.operation, sample.sample_type,
# sample.total_ops, sample.time_period), file=sample_log)
# once we have seen a new sample type, we stick to it.
current.maybe_update_sample_type(sample.sample_type)
# we need to store the total count separately and cannot update `current.total_count` immediately here
# because we would count all raw samples in `unprocessed` twice. Hence, we'll only update
# `current.total_count` when we have calculated a new throughput sample.
count += sample.total_ops
current.update_interval(sample.absolute_time)
if current.can_calculate_throughput():
current.finish_bucket(count)
task_throughput.append(
(
sample.absolute_time,
sample.relative_time,
current.sample_type,
current.throughput,
# we calculate throughput per second
f"{sample.total_ops_unit}/s",
)
)
else:
current.unprocessed.append(sample)
# also include the last sample if we don't have one for the current sample type, even if it is below the bucket
# interval (mainly needed to ensure we show throughput data in test mode)
if last_sample is not None and current.can_add_final_throughput_sample():
current.finish_bucket(count)
task_throughput.append(
(
last_sample.absolute_time,
last_sample.relative_time,
current.sample_type,
current.throughput,
f"{last_sample.total_ops_unit}/s",
)
)
return task_throughput