in esrally/driver/driver.py [0:0]
def may_complete_current_task(self, task_allocations):
any_joinpoints_completing_parent = [a for a in task_allocations if a.task.any_task_completes_parent]
joinpoints_completing_parent = [a for a in task_allocations if a.task.preceding_task_completes_parent]
# If 'completed-by' is set to 'any', then we *do* want to check for completion by
# any client and *not* wait until the remaining runner has completed. This way the 'parallel' task will exit
# on the completion of _any_ client for any task, i.e. given a contrived track with two tasks to execute inside
# a parallel block:
# * parallel:
# * bulk-1, with clients 8
# * bulk-2, with clients: 8
#
# 1. Both 'bulk-1' and 'bulk-2' execute in parallel
# 2. 'bulk-1' client[0]'s runner is first to complete and reach the next joinpoint successfully
# 3. 'bulk-1' will now cause the parent task to complete and _not_ wait for all 8 clients' runner to complete,
# or for 'bulk-2' at all
#
# The reasoning for the distinction between 'any_joinpoints_completing_parent' & 'joinpoints_completing_parent'
# is to simplify logic, otherwise we'd need to implement some form of state machine involving actor-to-actor
# communication.
if len(any_joinpoints_completing_parent) > 0 and not self.complete_current_task_sent:
self.logger.info(
"Any task before join point [%s] is able to complete the parent structure. Telling all clients to exit immediately.",
any_joinpoints_completing_parent[0].task,
)
self.complete_current_task_sent = True
for worker in self.workers:
self.driver_actor.complete_current_task(worker)
# If we have a specific 'completed-by' task specified, then we want to make sure that all clients for that task
# are able to complete their runners as expected before completing the parent
elif len(joinpoints_completing_parent) > 0 and not self.complete_current_task_sent:
# while this list could contain multiple items, it should always be the same task (but multiple
# different clients) so any item is sufficient.
current_join_point = joinpoints_completing_parent[0].task
self.logger.info(
"Tasks before join point [%s] are able to complete the parent structure. Checking "
"if all [%d] clients have finished yet.",
current_join_point,
len(current_join_point.clients_executing_completing_task),
)
pending_client_ids = []
for client_id in current_join_point.clients_executing_completing_task:
# We assume that all clients have finished if their corresponding worker has finished
worker_id = self.clients_per_worker[client_id]
if worker_id not in self.workers_completed_current_step:
pending_client_ids.append(client_id)
# are all clients executing said task already done? if so we need to notify the remaining clients
if len(pending_client_ids) == 0:
# As we are waiting for other clients to finish, we would send this message over and over again.
# Hence we need to memorize whether we have already sent it for the current step.
self.complete_current_task_sent = True
self.logger.info("All affected clients have finished. Notifying all clients to complete their current tasks.")
for worker in self.workers:
self.driver_actor.complete_current_task(worker)
else:
if len(pending_client_ids) > 32:
self.logger.info("[%d] clients did not yet finish.", len(pending_client_ids))
else:
self.logger.info("Client id(s) [%s] did not yet finish.", ",".join(map(str, pending_client_ids)))