in esrally/driver/driver.py [0:0]
def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations):
self.currently_completed += 1
self.workers_completed_current_step[worker_id] = (worker_local_timestamp, time.perf_counter())
self.logger.debug(
"[%d/%d] workers reached join point [%d/%d].",
self.currently_completed,
len(self.workers),
self.current_step + 1,
self.number_of_steps,
)
if self.currently_completed == len(self.workers):
self.logger.info("All workers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps)
# we can go on to the next step
self.currently_completed = 0
self.complete_current_task_sent = False
# make a copy and reset early to avoid any race conditions from clients that reach a join point already while we are sending...
workers_curr_step = self.workers_completed_current_step
self.workers_completed_current_step = {}
self.update_progress_message(task_finished=True)
# clear per step
self.most_recent_sample_per_client = {}
self.current_step += 1
self.logger.debug("Postprocessing samples...")
self.post_process_samples()
if self.finished():
self.telemetry.on_benchmark_stop()
self.logger.info("All steps completed.")
# Some metrics store implementations return None because no external representation is required.
# pylint: disable=assignment-from-none
m = self.metrics_store.to_externalizable(clear=True)
self.logger.debug("Closing metrics store...")
self.metrics_store.close()
# immediately clear as we don't need it anymore and it can consume a significant amount of memory
self.metrics_store = None
if self.generated_api_key_ids:
self.logger.debug("Deleting auto-generated client API keys...")
try:
delete_api_keys(self.default_sync_es_client, self.generated_api_key_ids)
except exceptions.RallyError:
console.warn(
"Unable to delete auto-generated API keys. You may need to manually delete them. "
"Please check the logs for details."
)
self.logger.debug("Sending benchmark results...")
self.driver_actor.on_benchmark_complete(m)
else:
self.move_to_next_task(workers_curr_step)
else:
self.may_complete_current_task(task_allocations)