in esrally/driver/driver.py [0:0]
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.
:param task_allocation: The task allocation that should be executed by this schedule.
:param parameter_source: The parameter source that should be used for this task.
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
sched = scheduler.scheduler_for(task)
# We cannot use the global client index here because we need to support parallel execution of tasks
# with multiple clients. Consider the following scenario:
#
# * Clients 0-3 bulk index into indexA
# * Clients 4-7 bulk index into indexB
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
client_index = task_allocation.client_index_in_task
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.debug("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
sched.parameter_source = params_for_op
if requires_time_period_schedule(task, runner_for_op, params_for_op):
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
if client_index == 0:
logger.info(
"Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.",
task.schedule,
task.name,
str(warmup_time_period),
str(task.time_period),
)
loop_control = TimePeriodBased(warmup_time_period, task.time_period)
else:
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
if task.iterations:
iterations = task.iterations
elif params_for_op.infinite:
# this is usually the case if the parameter source provides a constant
iterations = 1
else:
iterations = None
if client_index == 0:
logger.debug(
"Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
"iterations and [%s] iterations.",
task.schedule,
task.name,
str(warmup_iterations),
str(iterations),
)
loop_control = IterationBased(warmup_iterations, iterations)
if client_index == 0:
if loop_control.infinite:
logger.debug("Parameter source will determine when the schedule for [%s] terminates.", task.name)
else:
logger.debug("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)