def Run()

in perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py [0:0]


def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]:
  """Run the sysbench benchmark and publish results.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.

  Returns:
    Results.
  """
  primary_server = benchmark_spec.vm_groups['server'][0]
  client = benchmark_spec.vm_groups['client'][0]
  sysbench_parameters = _GetSysbenchParameters(
      primary_server.internal_ip,
      postgresql16.GetPsqlUserPassword(FLAGS.run_uri),
  )
  results = []
  # a map of transaction metric name (tps/qps) to current sample with max value
  max_transactions = {}
  sorted_threads = sorted(FLAGS.sysbench_run_threads)
  previous_qps = 0
  reached_peak = False
  for thread_count in sorted_threads:
    sysbench_parameters.threads = thread_count
    cmd = sysbench.BuildRunCommand(sysbench_parameters)
    logging.info('%s run command: %s', FLAGS.sysbench_testname, cmd)
    try:
      stdout, _ = client.RemoteCommand(
          cmd, timeout=2*FLAGS.sysbench_run_seconds,)
    except errors.VirtualMachine.RemoteCommandError as e:
      logging.exception('Failed to run sysbench command: %s', e)
      raise errors.Benchmarks.RunError(f'Error running sysbench command: {e}')
    metadata = sysbench.GetMetadata(sysbench_parameters)
    metadata.update({
        'shared_buffer_size': f'{SHARED_BUFFER_SIZE.value}GB',
    })
    results += sysbench.ParseSysbenchTimeSeries(stdout, metadata)
    results += sysbench.ParseSysbenchLatency([stdout], metadata)
    current_transactions = sysbench.ParseSysbenchTransactions(stdout, metadata)
    results += current_transactions
    # max transactions stores the max tps/qps for all the thread counts.
    # update the max tps/qps in max_transactions.
    for item in current_transactions:
      metric = item.metric
      metric_value = item.value
      current_max_sample = max_transactions.get(metric, None)
      if not current_max_sample or current_max_sample.value < metric_value:
        max_transactions[metric] = item
    # store QPS at max threads
    # current_transactions is an array of two samples, tps and qps.
    current_qps = current_transactions[1].value
    if not reached_peak and current_qps < previous_qps:
      reached_peak = True
  # if we get max_qps at max thread_count, there is a possibility of a higher
  # qps at increased thread count. if --postgresql_measure_max_qps is set to
  # true, we want to make sure we achieve max QPS.
  if (
      _MEASURE_MAX_QPS.value
      and not reached_peak
  ):
    raise errors.Benchmarks.RunError(
        f'Max achieved at {sorted_threads[-1]} threads, possibility'
        ' of not enough client load. Consider using'
        ' --postgresql_measure_max_qps flag if you want to disable this check.'
    )
  if not results:
    raise errors.Benchmarks.RunError(
        'None of the sysbench tests were successful.'
    )
  # report the max tps/qps as a new metric.
  for item in max_transactions.values():
    metadata = copy.deepcopy(item.metadata)
    metadata['searched_thread_counts'] = FLAGS.sysbench_run_threads
    results.append(
        sample.Sample(
            'max_' + item.metric, item.value, item.unit, metadata=metadata
        )
    )
  return results