def RunStaircaseLoads()

in perfkitbenchmarker/linux_packages/ycsb.py [0:0]


  def RunStaircaseLoads(self, vms, workloads, **kwargs):
    """Run each workload in 'workloads' in succession.

    A staircase load is applied for each workload file, for each entry in
    ycsb_threads_per_client.

    Args:
      vms: List of VirtualMachine objects to generate load from.
      workloads: List of workload file names.
      **kwargs: Additional parameters to pass to each run.  See constructor for
        options.

    Returns:
      List of sample.Sample objects.
    """
    all_results = []
    parameters = {}
    for workload_index, workload_file in enumerate(workloads):
      if FLAGS.ycsb_operation_count:
        parameters = {'operationcount': FLAGS.ycsb_operation_count}
      if FLAGS.ycsb_record_count:
        parameters['recordcount'] = FLAGS.ycsb_record_count
      if FLAGS.ycsb_field_count:
        parameters['fieldcount'] = FLAGS.ycsb_field_count
      if FLAGS.ycsb_field_length:
        parameters['fieldlength'] = FLAGS.ycsb_field_length
      if FLAGS.ycsb_timelimit:
        parameters['maxexecutiontime'] = FLAGS.ycsb_timelimit
      hdr_files_dir = posixpath.join(self.hdr_dir, str(workload_index))
      if FLAGS.ycsb_measurement_type == ycsb_stats.HDRHISTOGRAM:
        parameters['hdrhistogram.fileoutput'] = True
        parameters['hdrhistogram.output.path'] = hdr_files_dir
      if FLAGS.ycsb_requestdistribution:
        parameters['requestdistribution'] = FLAGS.ycsb_requestdistribution
      if FLAGS.ycsb_readproportion is not None:
        parameters['readproportion'] = FLAGS.ycsb_readproportion
      if FLAGS.ycsb_updateproportion is not None:
        parameters['updateproportion'] = FLAGS.ycsb_updateproportion
      if FLAGS.ycsb_scanproportion is not None:
        parameters['scanproportion'] = FLAGS.ycsb_scanproportion
      parameters.update(kwargs)
      remote_path = posixpath.join(
          linux_packages.INSTALL_DIR, os.path.basename(workload_file)
      )

      with open(workload_file) as fp:
        workload_meta = ParseWorkload(fp.read())
        workload_meta.update(kwargs)
        workload_meta.update(
            workload_name=os.path.basename(workload_file),
            workload_index=workload_index,
            stage='run',
        )

      args = [
          ((vm, workload_file, remote_path), {}) for vm in dict.fromkeys(vms)
      ]
      background_tasks.RunThreaded(PushWorkload, args)

      parameters['parameter_files'] = [remote_path]

      # _GetThreadsQpsPerLoaderList() passes tuple of (client_count, target=0)
      # if no target is passed via flags.
      for client_count, target_qps_per_vm in _GetThreadsQpsPerLoaderList(
          len(vms)
      ):

        @vm_util.Retry(
            retryable_exceptions=ycsb_stats.CombineHdrLogError,
            timeout=-1,
            max_retries=5,
        )
        def _DoRunStairCaseLoad(
            client_count: int,
            target_qps_per_vm: int,
            workload_meta: Mapping[str, Any],
            is_sustained: bool = False,
        ):
          parameters['threads'] = client_count
          if target_qps_per_vm:
            # Threads should be less than the target QPS since YCSB throttles
            # weirdly when threads is much larger than target.
            parameters['threads'] = min(client_count, target_qps_per_vm)
            parameters['target'] = int(target_qps_per_vm * len(vms))
          if is_sustained:
            parameters['maxexecutiontime'] = (
                FLAGS.ycsb_dynamic_load_sustain_timelimit
            )
          start = time.time()
          results = self._RunThreaded(vms, **parameters)
          events.record_event.send(
              type(self).__name__,
              event='run',
              start_timestamp=start,
              end_timestamp=time.time(),
              metadata=copy.deepcopy(parameters),
          )
          client_meta = workload_meta.copy()
          client_meta.update(parameters)
          client_meta.update(
              clients=len(vms) * client_count,
              threads_per_client_vm=client_count,
          )
          # Values passed in via this flag do not get recorded in metadata.
          # The target passed in is applied to each client VM, so multiply by
          # len(vms).
          for pv in FLAGS.ycsb_run_parameters:
            param, value = pv.split('=', 1)
            if param == 'target':
              value = int(value) * len(vms)
            client_meta[param] = value

          if FLAGS.ycsb_include_individual_results and len(results) > 1:
            for i, result in enumerate(results):
              all_results.extend(
                  ycsb_stats.CreateSamples(
                      ycsb_result=result,
                      ycsb_version=_YCSB_VERSION.value,
                      ycsb_commit=_YCSB_COMMIT.value,
                      include_histogram=FLAGS.ycsb_histogram,
                      include_command_line=_SHOULD_RECORD_COMMAND_LINE.value,
                      result_type='individual',
                      result_index=i,
                      **client_meta,
                  )
              )

          if self.measurement_type == ycsb_stats.HDRHISTOGRAM:
            combined_log = ycsb_stats.CombineHdrHistogramLogFiles(
                self.hdr_dir, parameters['hdrhistogram.output.path'], vms
            )
            parsed_hdr = ycsb_stats.ParseHdrLogs(combined_log)
            combined = ycsb_stats.CombineResults(
                results, self.measurement_type, parsed_hdr
            )
          else:
            combined = ycsb_stats.CombineResults(
                results, self.measurement_type, {}
            )
          run_samples = list(
              ycsb_stats.CreateSamples(
                  ycsb_result=combined,
                  ycsb_version=_YCSB_VERSION.value,
                  ycsb_commit=_YCSB_COMMIT.value,
                  include_command_line=_SHOULD_RECORD_COMMAND_LINE.value,
                  include_histogram=FLAGS.ycsb_histogram,
                  result_type='combined',
                  **client_meta,
              )
          )

          overall_throughput = 0
          for s in run_samples:
            if s.metric == 'overall Throughput':
              overall_throughput += s.value
          return overall_throughput, run_samples

        if target_qps_per_vm > 0 and parameters.get('target', 0) > 0:
          raise errors.Benchmarks.RunError(
              'Target QPS should only be passed in via one of'
              ' --ycsb_threads_per_client, --ycsb_target_qps, or parameters.'
          )
        if 'target' in parameters:
          target_qps_per_vm = int(parameters['target'] / len(vms))

        # Consider refactoring so that target_qps_per_vm doesn't need to be
        # passed in. Target should already be set in parameters.
        target_throughput, run_samples = _DoRunStairCaseLoad(
            client_count, target_qps_per_vm, workload_meta
        )

        # Uses 5 * unthrottled throughput as starting point.
        target_throughput *= 5
        all_results.extend(run_samples)
        is_sustained = False
        while FLAGS.ycsb_dynamic_load:
          actual_throughput, run_samples = _DoRunStairCaseLoad(
              client_count,
              int(target_throughput // len(vms)),
              workload_meta,
              is_sustained,
          )
          is_sustained = FLAGS.ycsb_dynamic_load_sustain_throughput_ratio < (
              actual_throughput / target_throughput
          )
          for s in run_samples:
            s.metadata['sustained'] = is_sustained
          all_results.extend(run_samples)
          target_throughput = self._GetRunLoadTarget(
              actual_throughput, is_sustained
          )
          if target_throughput is None:
            break
          if _DYNAMIC_LOAD_SLEEP_SEC.value > 0:
            logging.info(
                'Sleeping %s seconds after dynamic load.',
                _DYNAMIC_LOAD_SLEEP_SEC.value,
            )
            time.sleep(_DYNAMIC_LOAD_SLEEP_SEC.value)

    return all_results