in osbenchmark/worker_coordinator/worker_coordinator.py [0:0]
def schedule_for(task, client_index, parameter_source):
"""
Calculates a client's schedule for a given task.
:param task: The task that should be executed.
:param client_index: The current client index. Must be in the range [0, `task.clients').
:param parameter_source: The parameter source that should be used for this task.
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
sched.parameter_source = params_for_op
if requires_time_period_schedule(task, runner_for_op, params_for_op):
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
if client_index == 0:
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
str(warmup_time_period), str(task.time_period))
loop_control = TimePeriodBased(warmup_time_period, task.time_period)
else:
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
if task.iterations:
iterations = task.iterations
elif params_for_op.infinite:
# this is usually the case if the parameter source provides a constant
iterations = 1
else:
iterations = None
if client_index == 0:
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations))
loop_control = IterationBased(warmup_iterations, iterations)
if client_index == 0:
if loop_control.infinite:
logger.info("Parameter source will determine when the schedule for [%s] terminates.", task.name)
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)
return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)