in esrally/driver/driver.py [0:0]
def allocations(self):
"""
Calculates an allocation matrix consisting of two dimensions. The first dimension is the client. The second dimension are the task
this client needs to run. The matrix shape is rectangular (i.e. it is not ragged). There are three types of entries in the matrix:
1. Normal tasks: They need to be executed by a client.
2. Join points: They are used as global coordination points which all clients need to reach until the benchmark can go on. They
indicate that a client has to wait until the master signals it can go on.
3. `None`: These are inserted by the allocator to keep the allocation matrix rectangular. Clients have to skip `None` entries
until one of the other entry types are encountered.
:return: An allocation matrix with the structure described above.
"""
max_clients = self.clients
allocations = [None] * max_clients
for client_index in range(max_clients):
allocations[client_index] = []
join_point_id = 0
# start with an artificial join point to allow master to coordinate that all clients start at the same time
next_join_point = JoinPoint(join_point_id)
for client_index in range(max_clients):
allocations[client_index].append(next_join_point)
join_point_id += 1
for task in self.schedule:
start_client_index = 0
clients_executing_completing_task = []
any_task_completes_parent = []
for sub_task in task:
for client_index in range(start_client_index, start_client_index + sub_task.clients):
# this is the actual client that will execute the task. It may differ from the logical one in case we over-commit (i.e.
# more tasks than actually available clients)
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
elif sub_task.any_completes_parent:
any_task_completes_parent.append(physical_client_index)
ta = TaskAllocation(
task=sub_task,
client_index_in_task=client_index - start_client_index,
global_client_index=client_index,
# if task represents a parallel structure this is the total number of clients
# executing sub-tasks concurrently.
total_clients=task.clients,
)
allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients
# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
# executes three tasks, the other one only two. So we need to fill in a `None` for the second one.
if start_client_index % max_clients > 0:
# pin the index range to [0, max_clients). This simplifies the code below.
start_client_index = start_client_index % max_clients
for client_index in range(start_client_index, max_clients):
allocations[client_index].append(None)
# let all clients join after each task, then we go on
next_join_point = JoinPoint(join_point_id, clients_executing_completing_task, any_task_completes_parent)
for client_index in range(max_clients):
allocations[client_index].append(next_join_point)
join_point_id += 1
return allocations