in esrally/driver/driver.py [0:0]
def __call__(self, raw_samples):
if len(raw_samples) == 0:
return
total_start = time.perf_counter()
start = total_start
final_sample_count = 0
for idx, sample in enumerate(raw_samples):
if idx % self.downsample_factor == 0:
final_sample_count += 1
client_id_meta_data = {"client_id": sample.client_id}
meta_data = self.merge(
self.track_meta_data,
self.challenge_meta_data,
sample.operation_meta_data,
sample.task.meta_data,
sample.request_meta_data,
client_id_meta_data,
)
self.metrics_store.put_value_cluster_level(
name="latency",
value=convert.seconds_to_ms(sample.latency),
unit="ms",
task=sample.task.name,
operation=sample.operation_name,
operation_type=sample.operation_type,
sample_type=sample.sample_type,
absolute_time=sample.absolute_time,
relative_time=sample.relative_time,
meta_data=meta_data,
)
self.metrics_store.put_value_cluster_level(
name="service_time",
value=convert.seconds_to_ms(sample.service_time),
unit="ms",
task=sample.task.name,
operation=sample.operation_name,
operation_type=sample.operation_type,
sample_type=sample.sample_type,
absolute_time=sample.absolute_time,
relative_time=sample.relative_time,
meta_data=meta_data,
)
self.metrics_store.put_value_cluster_level(
name="processing_time",
value=convert.seconds_to_ms(sample.processing_time),
unit="ms",
task=sample.task.name,
operation=sample.operation_name,
operation_type=sample.operation_type,
sample_type=sample.sample_type,
absolute_time=sample.absolute_time,
relative_time=sample.relative_time,
meta_data=meta_data,
)
for timing in sample.dependent_timings:
self.metrics_store.put_value_cluster_level(
name="service_time",
value=convert.seconds_to_ms(timing.service_time),
unit="ms",
task=timing.task.name,
operation=timing.operation_name,
operation_type=timing.operation_type,
sample_type=timing.sample_type,
absolute_time=timing.absolute_time,
relative_time=timing.relative_time,
meta_data=self.merge(timing.request_meta_data, client_id_meta_data),
)
end = time.perf_counter()
self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
start = end
aggregates = self.throughput_calculator.calculate(raw_samples)
end = time.perf_counter()
self.logger.debug("Calculating throughput took [%f] seconds.", (end - start))
start = end
for task, samples in aggregates.items():
meta_data = self.merge(self.track_meta_data, self.challenge_meta_data, task.operation.meta_data, task.meta_data)
for absolute_time, relative_time, sample_type, throughput, throughput_unit in samples:
self.metrics_store.put_value_cluster_level(
name="throughput",
value=throughput,
unit=throughput_unit,
task=task.name,
operation=task.operation.name,
operation_type=task.operation.type,
sample_type=sample_type,
absolute_time=absolute_time,
relative_time=relative_time,
meta_data=meta_data,
)
end = time.perf_counter()
self.logger.debug("Storing throughput took [%f] seconds.", (end - start))
start = end
# this will be a noop for the in-memory metrics store. If we use an ES metrics store however, this will ensure that we already send
# the data and also clear the in-memory buffer. This allows users to see data already while running the benchmark. In cases where
# it does not matter (i.e. in-memory) we will still defer this step until the end.
#
# Don't force refresh here in the interest of short processing times. We don't need to query immediately afterwards so there is
# no need for frequent refreshes.
self.metrics_store.flush(refresh=False)
end = time.perf_counter()
self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start))
self.logger.debug(
"Postprocessing [%d] raw samples (downsampled to [%d] samples) took [%f] seconds in total.",
len(raw_samples),
final_sample_count,
(end - total_start),
)