def may_complete_current_task()

in esrally/driver/driver.py [0:0]


    def may_complete_current_task(self, task_allocations):
        any_joinpoints_completing_parent = [a for a in task_allocations if a.task.any_task_completes_parent]
        joinpoints_completing_parent = [a for a in task_allocations if a.task.preceding_task_completes_parent]

        # If 'completed-by' is set to 'any', then we *do* want to check for completion by
        # any client and *not* wait until the remaining runner has completed. This way the 'parallel' task will exit
        # on the completion of _any_ client for any task, i.e. given a contrived track with two tasks to execute inside
        # a parallel block:
        #   * parallel:
        #     * bulk-1, with clients 8
        #     * bulk-2, with clients: 8
        #
        # 1. Both 'bulk-1' and 'bulk-2' execute in parallel
        # 2. 'bulk-1' client[0]'s runner is first to complete and reach the next joinpoint successfully
        # 3. 'bulk-1' will now cause the parent task to complete and _not_ wait for all 8 clients' runner to complete,
        # or for 'bulk-2' at all
        #
        # The reasoning for the distinction between 'any_joinpoints_completing_parent' & 'joinpoints_completing_parent'
        # is to simplify logic, otherwise we'd need to implement some form of state machine involving actor-to-actor
        # communication.

        if len(any_joinpoints_completing_parent) > 0 and not self.complete_current_task_sent:
            self.logger.info(
                "Any task before join point [%s] is able to complete the parent structure. Telling all clients to exit immediately.",
                any_joinpoints_completing_parent[0].task,
            )

            self.complete_current_task_sent = True
            for worker in self.workers:
                self.driver_actor.complete_current_task(worker)

        # If we have a specific 'completed-by' task specified, then we want to make sure that all clients for that task
        # are able to complete their runners as expected before completing the parent
        elif len(joinpoints_completing_parent) > 0 and not self.complete_current_task_sent:
            # while this list could contain multiple items, it should always be the same task (but multiple
            # different clients) so any item is sufficient.
            current_join_point = joinpoints_completing_parent[0].task
            self.logger.info(
                "Tasks before join point [%s] are able to complete the parent structure. Checking "
                "if all [%d] clients have finished yet.",
                current_join_point,
                len(current_join_point.clients_executing_completing_task),
            )
            pending_client_ids = []
            for client_id in current_join_point.clients_executing_completing_task:
                # We assume that all clients have finished if their corresponding worker has finished
                worker_id = self.clients_per_worker[client_id]
                if worker_id not in self.workers_completed_current_step:
                    pending_client_ids.append(client_id)

            # are all clients executing said task already done? if so we need to notify the remaining clients
            if len(pending_client_ids) == 0:
                # As we are waiting for other clients to finish, we would send this message over and over again.
                # Hence we need to memorize whether we have already sent it for the current step.
                self.complete_current_task_sent = True
                self.logger.info("All affected clients have finished. Notifying all clients to complete their current tasks.")
                for worker in self.workers:
                    self.driver_actor.complete_current_task(worker)
            else:
                if len(pending_client_ids) > 32:
                    self.logger.info("[%d] clients did not yet finish.", len(pending_client_ids))
                else:
                    self.logger.info("Client id(s) [%s] did not yet finish.", ",".join(map(str, pending_client_ids)))