in perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py [0:0]
def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]:
"""Run the sysbench benchmark and publish results.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
Results.
"""
primary_server = benchmark_spec.vm_groups['server'][0]
client = benchmark_spec.vm_groups['client'][0]
sysbench_parameters = _GetSysbenchParameters(
primary_server.internal_ip,
postgresql16.GetPsqlUserPassword(FLAGS.run_uri),
)
results = []
# a map of transaction metric name (tps/qps) to current sample with max value
max_transactions = {}
sorted_threads = sorted(FLAGS.sysbench_run_threads)
previous_qps = 0
reached_peak = False
for thread_count in sorted_threads:
sysbench_parameters.threads = thread_count
cmd = sysbench.BuildRunCommand(sysbench_parameters)
logging.info('%s run command: %s', FLAGS.sysbench_testname, cmd)
try:
stdout, _ = client.RemoteCommand(
cmd, timeout=2*FLAGS.sysbench_run_seconds,)
except errors.VirtualMachine.RemoteCommandError as e:
logging.exception('Failed to run sysbench command: %s', e)
raise errors.Benchmarks.RunError(f'Error running sysbench command: {e}')
metadata = sysbench.GetMetadata(sysbench_parameters)
metadata.update({
'shared_buffer_size': f'{SHARED_BUFFER_SIZE.value}GB',
})
results += sysbench.ParseSysbenchTimeSeries(stdout, metadata)
results += sysbench.ParseSysbenchLatency([stdout], metadata)
current_transactions = sysbench.ParseSysbenchTransactions(stdout, metadata)
results += current_transactions
# max transactions stores the max tps/qps for all the thread counts.
# update the max tps/qps in max_transactions.
for item in current_transactions:
metric = item.metric
metric_value = item.value
current_max_sample = max_transactions.get(metric, None)
if not current_max_sample or current_max_sample.value < metric_value:
max_transactions[metric] = item
# store QPS at max threads
# current_transactions is an array of two samples, tps and qps.
current_qps = current_transactions[1].value
if not reached_peak and current_qps < previous_qps:
reached_peak = True
# if we get max_qps at max thread_count, there is a possibility of a higher
# qps at increased thread count. if --postgresql_measure_max_qps is set to
# true, we want to make sure we achieve max QPS.
if (
_MEASURE_MAX_QPS.value
and not reached_peak
):
raise errors.Benchmarks.RunError(
f'Max achieved at {sorted_threads[-1]} threads, possibility'
' of not enough client load. Consider using'
' --postgresql_measure_max_qps flag if you want to disable this check.'
)
if not results:
raise errors.Benchmarks.RunError(
'None of the sysbench tests were successful.'
)
# report the max tps/qps as a new metric.
for item in max_transactions.values():
metadata = copy.deepcopy(item.metadata)
metadata['searched_thread_counts'] = FLAGS.sysbench_run_threads
results.append(
sample.Sample(
'max_' + item.metric, item.value, item.unit, metadata=metadata
)
)
return results