def schedule_for()

in esrally/driver/driver.py [0:0]


def schedule_for(task_allocation, parameter_source):
    """
    Calculates a client's schedule for a given task.

    :param task_allocation: The task allocation that should be executed by this schedule.
    :param parameter_source: The parameter source that should be used for this task.
    :return: A generator for the operations the given client needs to perform for this task.
    """
    logger = logging.getLogger(__name__)
    task = task_allocation.task
    op = task.operation
    sched = scheduler.scheduler_for(task)

    # We cannot use the global client index here because we need to support parallel execution of tasks
    # with multiple clients. Consider the following scenario:
    #
    # * Clients 0-3 bulk index into indexA
    # * Clients 4-7 bulk index into indexB
    #
    # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
    # need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
    client_index = task_allocation.client_index_in_task

    # guard all logging statements with the client index and only emit them for the first client. This information is
    # repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
    if client_index == 0:
        logger.debug("Choosing [%s] for [%s].", sched, task)
    runner_for_op = runner.runner_for(op.type)
    params_for_op = parameter_source.partition(client_index, task.clients)
    if hasattr(sched, "parameter_source"):
        if client_index == 0:
            logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
        sched.parameter_source = params_for_op

    if requires_time_period_schedule(task, runner_for_op, params_for_op):
        warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
        if client_index == 0:
            logger.info(
                "Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
                "seconds and a time period of [%s] seconds.",
                task.schedule,
                task.name,
                str(warmup_time_period),
                str(task.time_period),
            )
        loop_control = TimePeriodBased(warmup_time_period, task.time_period)
    else:
        warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
        if task.iterations:
            iterations = task.iterations
        elif params_for_op.infinite:
            # this is usually the case if the parameter source provides a constant
            iterations = 1
        else:
            iterations = None
        if client_index == 0:
            logger.debug(
                "Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
                "iterations and [%s] iterations.",
                task.schedule,
                task.name,
                str(warmup_iterations),
                str(iterations),
            )
        loop_control = IterationBased(warmup_iterations, iterations)

    if client_index == 0:
        if loop_control.infinite:
            logger.debug("Parameter source will determine when the schedule for [%s] terminates.", task.name)
        else:
            logger.debug("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

    return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)