in osbenchmark/worker_coordinator/worker_coordinator.py [0:0]
def may_complete_current_task(self, task_allocations):
joinpoints_completing_parent = [a for a in task_allocations if a.task.preceding_task_completes_parent]
# we need to actively send CompleteCurrentTask messages to all remaining workers.
if len(joinpoints_completing_parent) > 0 and not self.complete_current_task_sent:
# while this list could contain multiple items, it should always be the same task (but multiple
# different clients) so any item is sufficient.
current_join_point = joinpoints_completing_parent[0].task
self.logger.info("Tasks before join point [%s] are able to complete the parent structure. Checking "
"if all [%d] clients have finished yet.",
current_join_point, len(current_join_point.clients_executing_completing_task))
pending_client_ids = []
for client_id in current_join_point.clients_executing_completing_task:
# We assume that all clients have finished if their corresponding worker has finished
worker_id = self.clients_per_worker[client_id]
if worker_id not in self.workers_completed_current_step:
pending_client_ids.append(client_id)
# are all clients executing said task already done? if so we need to notify the remaining clients
if len(pending_client_ids) == 0:
# As we are waiting for other clients to finish, we would send this message over and over again.
# Hence we need to memorize whether we have already sent it for the current step.
self.complete_current_task_sent = True
self.logger.info("All affected clients have finished. Notifying all clients to complete their current tasks.")
for worker in self.workers:
self.target.complete_current_task(worker)
else:
if len(pending_client_ids) > 32:
self.logger.info("[%d] clients did not yet finish.", len(pending_client_ids))
else:
self.logger.info("Client id(s) [%s] did not yet finish.", ",".join(map(str, pending_client_ids)))