def joinpoint_reached()

in esrally/driver/driver.py [0:0]


    def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations):
        self.currently_completed += 1
        self.workers_completed_current_step[worker_id] = (worker_local_timestamp, time.perf_counter())
        self.logger.debug(
            "[%d/%d] workers reached join point [%d/%d].",
            self.currently_completed,
            len(self.workers),
            self.current_step + 1,
            self.number_of_steps,
        )
        if self.currently_completed == len(self.workers):
            self.logger.info("All workers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps)
            # we can go on to the next step
            self.currently_completed = 0
            self.complete_current_task_sent = False
            # make a copy and reset early to avoid any race conditions from clients that reach a join point already while we are sending...
            workers_curr_step = self.workers_completed_current_step
            self.workers_completed_current_step = {}
            self.update_progress_message(task_finished=True)
            # clear per step
            self.most_recent_sample_per_client = {}
            self.current_step += 1

            self.logger.debug("Postprocessing samples...")
            self.post_process_samples()
            if self.finished():
                self.telemetry.on_benchmark_stop()
                self.logger.info("All steps completed.")
                # Some metrics store implementations return None because no external representation is required.
                # pylint: disable=assignment-from-none
                m = self.metrics_store.to_externalizable(clear=True)
                self.logger.debug("Closing metrics store...")
                self.metrics_store.close()
                # immediately clear as we don't need it anymore and it can consume a significant amount of memory
                self.metrics_store = None
                if self.generated_api_key_ids:
                    self.logger.debug("Deleting auto-generated client API keys...")
                    try:
                        delete_api_keys(self.default_sync_es_client, self.generated_api_key_ids)
                    except exceptions.RallyError:
                        console.warn(
                            "Unable to delete auto-generated API keys. You may need to manually delete them. "
                            "Please check the logs for details."
                        )
                self.logger.debug("Sending benchmark results...")
                self.driver_actor.on_benchmark_complete(m)
            else:
                self.move_to_next_task(workers_curr_step)
        else:
            self.may_complete_current_task(task_allocations)