perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py (194 lines of code) (raw):

# Copyright 2024 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Sysbench Benchmark for unmanaged PostgreSQL db on a VM. This benchmark measures performance of Sysbench Databases on unmanaged postgreSQL. """ import copy import logging from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import benchmark_spec as bm_spec from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import os_types from perfkitbenchmarker import sample from perfkitbenchmarker.linux_packages import postgresql16 from perfkitbenchmarker.linux_packages import sysbench FLAGS = flags.FLAGS BENCHMARK_NAME = 'unmanaged_postgresql_sysbench' BENCHMARK_CONFIG = """ unmanaged_postgresql_sysbench: description: PostgreSQL on a VM benchmarked using Sysbench. vm_groups: client: vm_spec: GCP: machine_type: c3-standard-22 zone: us-east1-b AWS: machine_type: m7i.4xlarge zone: us-east-1a Azure: machine_type: Standard_D16s_v5 zone: eastus server: vm_spec: GCP: machine_type: c3-standard-22 zone: us-east1-b AWS: machine_type: r7i.4xlarge zone: us-east-1a Azure: machine_type: Standard_E20s_v5 zone: eastus disk_spec: GCP: disk_size: 500 disk_type: hyperdisk-balanced provisioned_iops: 160000 provisioned_throughput: 2400 num_striped_disks: 1 AWS: disk_size: 500 disk_type: gp3 provisioned_iops: 16000 provisioned_throughput: 1000 num_striped_disks: 5 Azure: disk_size: 200 disk_type: PremiumV2_LRS provisioned_iops: 40000 provisioned_throughput: 800 num_striped_disks: 2 flags: sysbench_version: df89d34c410a2277e19f77e47e535d0890b2029b disk_fs_type: xfs db_engine: postgresql sysbench_report_interval: 1 sysbench_ssl_mode: required sysbench_run_threads: 1,64,128,256,512,1024,2048 sysbench_run_seconds: 300 sysbench_load_threads: 128 """ # The database name is used to create a database on the server. _DATABASE_TYPE = 'pgsql' _DATABASE_NAME = 'sysbench' # test names _TPCC = 'percona_tpcc' _OLTP_READ_WRITE = 'oltp_read_write' _OLTP_READ_ONLY = 'oltp_read_only' _OLTP_WRITE_ONLY = 'oltp_write_only' _OLTP = [_OLTP_READ_WRITE, _OLTP_READ_ONLY, _OLTP_WRITE_ONLY] SHARED_BUFFER_SIZE = flags.DEFINE_integer( 'postgresql_shared_buffer_size', 10, 'Size of the shared buffer in the postgresql cluster (in Gb).', ) _MEASURE_MAX_QPS = flags.DEFINE_bool( 'postgresql_measure_max_qps', False, 'Measure Max QPS of all the thread counts. Please set to' " false if you don't want to measure max qps.", ) def GetConfig(user_config): """Get the benchmark config, applying user overrides. Args: user_config: Returns: Benchmark config. """ config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) # Instead of changing the default data dir of database in (multiple) configs, # Force the scratch disk as database default dir (simpler code). disk_spec = config['vm_groups']['server']['disk_spec'] for cloud in disk_spec: disk_spec[cloud]['mount_point'] = postgresql16.GetOSDependentDefaults( FLAGS.os_type )['disk_mount_point'] # Update machine type for server/client. if FLAGS.db_machine_type or FLAGS.db_zone: vm_spec = config['vm_groups']['server']['vm_spec'] for cloud in vm_spec: if FLAGS.db_zone: vm_spec[cloud]['zone'] = FLAGS.db_zone[0] if FLAGS.db_machine_type: vm_spec[cloud]['machine_type'] = FLAGS.db_machine_type if FLAGS.client_vm_machine_type or FLAGS.client_vm_zone: vm_spec = config['vm_groups']['client']['vm_spec'] for cloud in vm_spec: if FLAGS.client_vm_zone: vm_spec[cloud]['zone'] = FLAGS.client_vm_zone if FLAGS.client_vm_machine_type: vm_spec[cloud]['machine_type'] = FLAGS.client_vm_machine_type # Add replica servers if configured. if FLAGS.db_high_availability: for index, zone in enumerate(FLAGS.db_replica_zones): replica = copy.deepcopy(config['vm_groups']['server']) for cloud in replica['vm_spec']: replica['vm_spec'][cloud]['zone'] = zone config['vm_groups'][f'replica_{index}'] = replica return config def Prepare(benchmark_spec: bm_spec.BenchmarkSpec): """Prepare the servers and clients for the benchmark run. Args: benchmark_spec: """ vms = benchmark_spec.vms replica_servers = [] for vm in benchmark_spec.vm_groups: if vm.startswith('replica'): replica_servers += benchmark_spec.vm_groups[vm] background_tasks.RunThreaded(postgresql16.ConfigureSystemSettings, vms) background_tasks.RunThreaded(lambda vm: vm.Install('postgresql16'), vms) primary_server = benchmark_spec.vm_groups['server'][0] postgresql16.InitializeDatabase(primary_server) postgresql16.ConfigureAndRestart( primary_server, FLAGS.run_uri, SHARED_BUFFER_SIZE.value ) for index, replica in enumerate(replica_servers): postgresql16.SetupReplica( primary_server, replica, index, FLAGS.run_uri, SHARED_BUFFER_SIZE.value ) clients = benchmark_spec.vm_groups['client'] for client in clients: client.InstallPackages('git') InstallSysbench(client) if FLAGS.sysbench_testname == _TPCC: client.RemoteCommand( 'cd /opt && sudo rm -fr sysbench-tpcc && ' f'sudo git clone {sysbench.SYSBENCH_TPCC_REPRO}' ) loader_vm = benchmark_spec.vm_groups['client'][0] sysbench_parameters = _GetSysbenchParameters( primary_server.internal_ip, postgresql16.GetPsqlUserPassword(FLAGS.run_uri), ) cmd = sysbench.BuildLoadCommand(sysbench_parameters) logging.info('%s load command: %s', FLAGS.sysbench_testname, cmd) loader_vm.RemoteCommand(cmd) def InstallSysbench(vm): args = {'db_driver': _DATABASE_TYPE} if vm.OS_TYPE in os_types.AMAZONLINUX_TYPES + os_types.CENTOS_TYPES: sysbench.YumInstall(vm, args=args) else: sysbench.AptInstall(vm, args=args) def _GetSysbenchParameters(primary_server_ip: str | None, password: str): """Get sysbench parameters from flags.""" sysbench_parameters = sysbench.SysbenchInputParameters( db_driver=_DATABASE_TYPE, tables=FLAGS.sysbench_tables, threads=FLAGS.sysbench_load_threads, report_interval=FLAGS.sysbench_report_interval, db_user=_DATABASE_NAME, db_password=password, db_name=_DATABASE_NAME, host_ip=primary_server_ip, ) sysbench_parameters.port = 5432 test = FLAGS.sysbench_testname if test in _OLTP: sysbench_parameters.built_in_test = True sysbench_parameters.test = f'{sysbench.LUA_SCRIPT_PATH}{test}.lua' sysbench_parameters.db_ps_mode = 'disable' sysbench_parameters.skip_trx = True sysbench_parameters.table_size = FLAGS.sysbench_table_size elif test == _TPCC: sysbench_parameters.custom_lua_packages_path = '/opt/sysbench-tpcc/?.lua' sysbench_parameters.built_in_test = False sysbench_parameters.test = '/opt/sysbench-tpcc/tpcc.lua' sysbench_parameters.scale = FLAGS.sysbench_scale sysbench_parameters.use_fk = FLAGS.sysbench_use_fk sysbench_parameters.trx_level = FLAGS.sysbench_txn_isolation_level else: raise errors.Setup.InvalidConfigurationError( f'Test --sysbench_testname={FLAGS.sysbench_testname} is not supported.' ) return sysbench_parameters def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: """Run the sysbench benchmark and publish results. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: Results. """ primary_server = benchmark_spec.vm_groups['server'][0] client = benchmark_spec.vm_groups['client'][0] sysbench_parameters = _GetSysbenchParameters( primary_server.internal_ip, postgresql16.GetPsqlUserPassword(FLAGS.run_uri), ) results = [] # a map of transaction metric name (tps/qps) to current sample with max value max_transactions = {} sorted_threads = sorted(FLAGS.sysbench_run_threads) previous_qps = 0 reached_peak = False for thread_count in sorted_threads: sysbench_parameters.threads = thread_count cmd = sysbench.BuildRunCommand(sysbench_parameters) logging.info('%s run command: %s', FLAGS.sysbench_testname, cmd) try: stdout, _ = client.RemoteCommand( cmd, timeout=2*FLAGS.sysbench_run_seconds,) except errors.VirtualMachine.RemoteCommandError as e: logging.exception('Failed to run sysbench command: %s', e) raise errors.Benchmarks.RunError(f'Error running sysbench command: {e}') metadata = sysbench.GetMetadata(sysbench_parameters) metadata.update({ 'shared_buffer_size': f'{SHARED_BUFFER_SIZE.value}GB', }) results += sysbench.ParseSysbenchTimeSeries(stdout, metadata) results += sysbench.ParseSysbenchLatency([stdout], metadata) current_transactions = sysbench.ParseSysbenchTransactions(stdout, metadata) results += current_transactions # max transactions stores the max tps/qps for all the thread counts. # update the max tps/qps in max_transactions. for item in current_transactions: metric = item.metric metric_value = item.value current_max_sample = max_transactions.get(metric, None) if not current_max_sample or current_max_sample.value < metric_value: max_transactions[metric] = item # store QPS at max threads # current_transactions is an array of two samples, tps and qps. current_qps = current_transactions[1].value if not reached_peak and current_qps < previous_qps: reached_peak = True # if we get max_qps at max thread_count, there is a possibility of a higher # qps at increased thread count. if --postgresql_measure_max_qps is set to # true, we want to make sure we achieve max QPS. if ( _MEASURE_MAX_QPS.value and not reached_peak ): raise errors.Benchmarks.RunError( f'Max achieved at {sorted_threads[-1]} threads, possibility' ' of not enough client load. Consider using' ' --postgresql_measure_max_qps flag if you want to disable this check.' ) if not results: raise errors.Benchmarks.RunError( 'None of the sysbench tests were successful.' ) # report the max tps/qps as a new metric. for item in max_transactions.values(): metadata = copy.deepcopy(item.metadata) metadata['searched_thread_counts'] = FLAGS.sysbench_run_threads results.append( sample.Sample( 'max_' + item.metric, item.value, item.unit, metadata=metadata ) ) return results def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec): del benchmark_spec