def generate_load()

in perfmetrics/scripts/load_tests/python/load_generator/load_generator.py [0:0]


  def generate_load(self, task):
    """Performs load test using the given task.

    The load is generated on CPU and other resources (depending upon the task
    used) by running process(s) and thread(s) where task runs inside thread(s)
    and thread(s) runs inside process(s).

    Args:
      task: Implementation of task.LoadTestTask.

    Returns:
      Returns start_time, end_time of load test, latencies and results of all
        the tasks performed over the span of load test.

    Raises:
      RuntimeError: If the given task is not completed even once during the
        course of load test.
    """
    tasks_results_queue = multiprocessing.Manager().Queue()
    pre_tasks_results_queue = multiprocessing.Manager().Queue()
    post_tasks_results_queue = multiprocessing.Manager().Queue()

    processes = []
    for process_id in range(self.num_processes):
      process = multiprocessing.Process(
          target=LoadGenerator._process_task,
          args=(task, process_id, self.num_threads_per_process,
                self.num_executions_per_thread, pre_tasks_results_queue,
                tasks_results_queue, post_tasks_results_queue))
      processes.append(process)

    # Initialize checkpoints to show completion of load test i.e. 25%, 50% etc.
    # Note: Completion checkpoints are shown only when self.run_time is set.
    # E.g. if self.run_time is 60 then the load test will inform that 50% of
    # load test is completed after 30 seconds.
    log_loading = self.run_time != sys.maxsize
    loading_checkpoints = list(
        map(lambda t: (t * self.run_time), lg_const.TIME_LOADING_PERCENTAGES))
    curr_loading_idx = 0

    for process in processes:
      process.start()
    logging.debug('%s number of processes started for task %s', len(processes),
                  task.task_name)

    start_time = curr_time = time.time()
    loading_checkpoints = [t + start_time for t in loading_checkpoints]
    # Loop till the condition of termination of load test is not met. The
    # condition is either the load test has run for self.run_time or completed
    # the total number of tasks assigned.
    while ((curr_time - start_time) < self.run_time) & \
        (tasks_results_queue.qsize() < self.total_num_tasks):
      # Sleep so that the looping is not very fast. 0.1 is decided on
      # discretion with the intention that time duration shouldn't be very
      # small or shouldn't be very large.
      time.sleep(0.1)
      curr_time = time.time()
      if log_loading & (curr_loading_idx < len(loading_checkpoints)) and (
          curr_time >= loading_checkpoints[curr_loading_idx]):
        logging.info('Load test completed %s%% for task: %s',
                     lg_const.TIME_LOADING_PERCENTAGES[curr_loading_idx] * 100,
                     task.task_name)
        curr_loading_idx = curr_loading_idx + 1
    logging.info('Load test completed 100%% for task: %s', task.task_name)

    for process in processes:
      process.terminate()
    logging.debug('%s number of processes terminated for task %s',
                  len(processes), task.task_name)

    # Raise error if not even a single task is completed
    if tasks_results_queue.qsize() < 1:
      raise RuntimeError('Not even a single task is completed. Pass higher '
                         'value to --run-time flag or check the task.')
    return {
        lg_const.START_TIME:
            start_time,
        lg_const.END_TIME:
            curr_time,
        lg_const.TASKS_RESULTS:
            self._convert_multiprocessing_queue_to_list(tasks_results_queue),
        lg_const.PRE_TASKS_RESULTS:
            self._convert_multiprocessing_queue_to_list(pre_tasks_results_queue
                                                       ),
        lg_const.POST_TASKS_RESULTS:
            self._convert_multiprocessing_queue_to_list(post_tasks_results_queue
                                                       ),
    }