perfkitbenchmarker/linux_benchmarks/sysbench_benchmark.py (560 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Sysbench Benchmark. This is a set of benchmarks that measures performance of Sysbench Databases on managed MySQL or Postgres. For unmanaged databases, consider using unmanaged_mysql_sysbench_benchmark.py. As other cloud providers deliver a managed MySQL service, we will add it here. """ import datetime import logging import time from typing import List from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import relational_db from perfkitbenchmarker import sample from perfkitbenchmarker import sql_engine_utils from perfkitbenchmarker import virtual_machine from perfkitbenchmarker.linux_packages import sysbench FLAGS = flags.FLAGS # The default values for flags and BENCHMARK_CONFIG are not a recommended # configuration for comparing sysbench performance. Rather these values # are set to provide a quick way to verify functionality is working. # A broader set covering different permuations on much larger data sets # is prefereable for comparison. flags.DEFINE_string( 'sysbench_testname', 'oltp_read_write', 'The built in oltp lua script to run', ) flags.DEFINE_integer( 'sysbench_tables', 4, 'The number of tables used in sysbench oltp.lua tests' ) flags.DEFINE_integer( 'sysbench_table_size', 100000, 'The number of rows of each table used in the oltp tests', ) flags.DEFINE_integer( 'sysbench_scale', 100, 'Scale parameter as used by TPCC benchmark.' ) _SLEEP_SEC = flags.DEFINE_integer( 'sysbench_sleep_after_load_sec', 0, 'The time to sleep after loading data.' ) flags.DEFINE_integer( 'sysbench_warmup_seconds', 10, 'The duration of the warmup run in which results are ' 'discarded, in seconds.', ) _RUN_DURATION = flags.DEFINE_integer( 'sysbench_run_seconds', 10, 'The duration of the actual run in which results are ' 'collected, in seconds.', ) _LOAD_CLIENTS = flags.DEFINE_integer( 'sysbench_load_client_vms', None, 'The number of client vms used in the prepare phase', ) _LOAD_THREADS = flags.DEFINE_integer( 'sysbench_load_threads', 64, 'Number of threads (per client VM) to use for loading.', ) flag_util.DEFINE_integerlist( 'sysbench_run_threads', flag_util.IntegerList([64]), 'array of thread counts passed to sysbench, one at a time', module_name=__name__, ) flags.DEFINE_integer( 'sysbench_latency_percentile', 100, 'The latency percentile we ask sysbench to compute.', ) flags.DEFINE_integer( 'sysbench_report_interval', 2, 'The interval, in seconds, we ask sysbench to report results.', ) flags.DEFINE_boolean( 'sysbench_use_fk', True, 'Use foreign keys. This is used by TPCC benchmark.' ) _SPANNER_PG_COMPAT_MODE = flags.DEFINE_boolean( 'sysbench_spanner_pg_compat_mode', True, 'If true, uses postgres-compatible benchmark script. Only used if' ' --sysbench_testname=spanner_tpcc.', ) _TXN_ISOLATION_LEVEL = flags.DEFINE_enum( 'sysbench_txn_isolation_level', 'SER', ['SER', 'RR', 'RC'], 'If true, uses postgres-compatible benchmark script. Only used if' ' --sysbench_testname=spanner_tpcc.', ) _SKIP_LOAD_STAGE = flags.DEFINE_boolean( 'sysbench_skip_load_stage', False, 'If true, skips the loading stage of the benchmark. Useful for when ' 'testing on a long-lived or static instance where the database has already ' 'been loaded on a previous run.', ) _AUTO_INCREMENT = flags.DEFINE_boolean( 'sysbench_auto_inc', True, 'Auto increment for sysbench' ) _SCALE_UP_MAX_CPU_UTILIZATION = flags.DEFINE_float( 'sysbench_scale_up_max_cpu_utilization', 0.95, 'Stop the auto scale up experiment when we reach this cpu utilization', ) # See https://github.com/Percona-Lab/sysbench-tpcc/releases for the most # up to date version. _SYSBENCH_TPCC_TAR = 'sysbench-tpcc.tar.gz' BENCHMARK_DATA = { _SYSBENCH_TPCC_TAR: ( '564600d6c296ef1cd88a07eeaf40bbc58688dbdc7c58fc1a0d28bb2b41c30611' ), } _SCALEUP_CLIENTS_TEST = flags.DEFINE_boolean( 'sysbench_scaleup_clients_test', False, 'Scale up the number of clients running the benchmark. This is a benchmark' ' mode to test how the server reacts when the load increases gradually by' ' having more clients over time. Need to override' ' sysbench.relational_db.vm_groups.clients.vm_count and the number have to' ' be creater than sysbench_scaleup_clients_test_num_clients for this' ' benchmark mode to work', ) _SCALEUP_CLIENTS_TEST_NUM_CLIENTS = flags.DEFINE_integer( 'sysbench_scaleup_clients_test_num_clients', 1, 'This defines the number of client we scale up to. Have to be smaller than' ' the number of existing clients.', ) SPANNER_TPCC = 'spanner-tpcc' # Parameters are defined in oltp_common.lua file # https://github.com/akopytov/sysbench _MAP_WORKLOAD_TO_VALID_UNIQUE_PARAMETERS = { 'tpcc': {'scale'}, SPANNER_TPCC: {'scale'}, 'oltp_write_only': {'table_size', 'auto-inc'}, 'oltp_read_only': {'table_size', 'auto-inc'}, 'oltp_read_write': {'table_size', 'auto-inc'}, 'oltp_insert': {'table_size', 'auto-inc'}, } BENCHMARK_NAME = 'sysbench' BENCHMARK_CONFIG = """ sysbench: description: Sysbench OLTP benchmarks. relational_db: engine: mysql enable_freeze_restore: True db_spec: GCP: machine_type: db-n1-standard-16 zone: us-central1-c AWS: machine_type: db.m4.4xlarge zone: us-west-1a Azure: machine_type: GP_Gen5_2 zone: westus db_disk_spec: GCP: disk_size: 100 disk_type: pd-ssd AWS: disk_size: 6144 disk_type: gp2 Azure: #From AZ command line: #Valid storage sizes range from minimum of 128000 MB and additional #increments of 128000 MB up to maximum of 1024000 MB. disk_size: 128 vm_groups: servers: vm_spec: GCP: machine_type: n1-standard-16 zone: us-central1-c AWS: machine_type: m4.4xlarge zone: us-west-1a Azure: machine_type: Standard_B4ms zone: westus disk_spec: *default_500_gb replications: vm_spec: GCP: machine_type: n1-standard-16 zone: us-central1-b AWS: machine_type: m4.4xlarge zone: us-east-1a Azure: machine_type: Standard_B4ms zone: eastus disk_spec: *default_500_gb clients: vm_spec: GCP: machine_type: n1-standard-16 zone: us-central1-c AWS: machine_type: m4.4xlarge zone: us-west-1a Azure: machine_type: Standard_B4ms zone: westus disk_spec: GCP: disk_size: 500 disk_type: pd-ssd AWS: disk_size: 500 disk_type: gp2 Azure: disk_size: 500 disk_type: Premium_LRS """ # Constants defined for Sysbench tests. DISABLE = 'disable' UNIFORM = 'uniform' SECONDS_UNIT = 'seconds' def GetConfig(user_config): config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) vm_count = config['relational_db']['vm_groups']['clients'].get('vm_count', 1) if vm_count > 1 and FLAGS.sysbench_testname != SPANNER_TPCC: raise errors.Setup.InvalidConfigurationError( f'Test --sysbench_testname={FLAGS.sysbench_testname} only supports 1' f' client VM, got {vm_count}. Currently only {SPANNER_TPCC} is' ' supported.' ) return config # TODO(chunla) Move this to engine specific module def _GetCommonSysbenchOptions(db: relational_db.BaseRelationalDb): """Get Sysbench options.""" engine_type = db.engine_type result = [] if engine_type == sql_engine_utils.MYSQL: result.append('--db-driver=mysql') # Ignore possible mysql errors when running OLTP # https://github.com/actiontech/dble/issues/458 # https://callisto.digital/posts/tools/using-sysbench-to-benchmark-mysql-5-7/ if _GetSysbenchTestParameter() != 'tpcc': result += [ '--db-ps-mode=%s' % DISABLE, # Error 1205: Lock wait timeout exceeded # Could happen when we overload the database '--mysql-ignore-errors=1213,1205,1020,2013', ] elif engine_type in [ sql_engine_utils.POSTGRES, sql_engine_utils.SPANNER_POSTGRES, ]: result += [ '--db-driver=pgsql', ] result += [db.client_vm_query_tools.GetSysbenchConnectionString()] return result def CreateMetadataFromFlags(): """Create meta data with all flags for sysbench.""" metadata = { 'sysbench_testname': FLAGS.sysbench_testname, 'sysbench_tables': FLAGS.sysbench_tables, 'sysbench_table_size': FLAGS.sysbench_table_size, 'sysbench_scale': FLAGS.sysbench_scale, 'sysbench_warmup_seconds': FLAGS.sysbench_warmup_seconds, 'sysbench_run_seconds': FLAGS.sysbench_run_seconds, 'sysbench_latency_percentile': FLAGS.sysbench_latency_percentile, 'sysbench_report_interval': FLAGS.sysbench_report_interval, 'sysbench_rand_type': UNIFORM, } if FLAGS.sysbench_testname == SPANNER_TPCC: metadata['sysbench_use_fk'] = FLAGS.sysbench_use_fk return metadata def _GetSysbenchTestParameter() -> str: return ( 'tpcc' if FLAGS.sysbench_testname == SPANNER_TPCC else FLAGS.sysbench_testname ) def _GetDatabaseName(db: relational_db.BaseRelationalDb) -> str: """Returns the database name to use in this test.""" return db.database if hasattr(db, 'database') else 'sbtest' def _InstallLuaScriptsIfNecessary(vm): """Installs the lua scripts if necessary.""" if _GetSysbenchTestParameter() == 'tpcc': vm.InstallPreprovisionedBenchmarkData( BENCHMARK_NAME, [_SYSBENCH_TPCC_TAR], '~' ) vm.RemoteCommand( f'tar -zxvf {_SYSBENCH_TPCC_TAR} --strip-components 1' f' -C {sysbench.SYSBENCH_DIR}' ) vm.PushDataFile( 'sysbench/default_tpcc_common.lua', f'{sysbench.SYSBENCH_DIR}/tpcc_common.lua', ) if FLAGS.sysbench_testname == SPANNER_TPCC: vm.PushDataFile( 'sysbench/spanner_pg_tpcc_common.lua', f'{sysbench.SYSBENCH_DIR}/tpcc_common.lua', ) vm.PushDataFile( 'sysbench/spanner_pg_tpcc_run.lua', f'{sysbench.SYSBENCH_DIR}/tpcc_run.lua', ) vm.PushDataFile( 'sysbench/spanner_pg_tpcc.lua', f'{sysbench.SYSBENCH_DIR}/tpcc.lua' ) def _IsValidFlag(flag): return ( flag in _MAP_WORKLOAD_TO_VALID_UNIQUE_PARAMETERS[FLAGS.sysbench_testname] ) def _GetSysbenchPrepareCommand( db: relational_db.BaseRelationalDb, num_vms: int, vm_index: int ) -> str: """Returns the sysbench command used to load the database.""" # TODO(ruwa): Migrate to use sysbench.BuildLoadCommand() data_load_cmd_tokens = [ 'cd ~/sysbench/ && nice', # run with a niceness of lower priority '-15', # to encourage cpu time for ssh commands 'sysbench', _GetSysbenchTestParameter(), '--tables=%d' % FLAGS.sysbench_tables, ( '--table_size=%d' % FLAGS.sysbench_table_size if _IsValidFlag('table_size') else '' ), ('--scale=%d' % FLAGS.sysbench_scale if _IsValidFlag('scale') else ''), '--threads=%d' % _LOAD_THREADS.value, '--rand-type=%s' % UNIFORM, ] if _IsValidFlag('auto-inc'): if _AUTO_INCREMENT.value: data_load_cmd_tokens.append('--auto-inc=on') else: data_load_cmd_tokens.append('--auto-inc=off') if FLAGS.sysbench_testname == SPANNER_TPCC: # Supports loading through multiple VMs scale = FLAGS.sysbench_scale start_scale = (scale / num_vms) * vm_index + 1 end_scale = (scale / num_vms) * (vm_index + 1) data_load_cmd_tokens.extend([ '--use_fk=%d' % (1 if FLAGS.sysbench_use_fk else 0), '--enable_cluster=%d' % (0 if num_vms == 1 else 1), '--start_scale=%d' % start_scale, '--end_scale=%d' % end_scale, '--enable_pg_compat_mode=%d' % (1 if _SPANNER_PG_COMPAT_MODE.value else 0), ]) if ( db.engine_type == sql_engine_utils.SPANNER_POSTGRES and FLAGS.sysbench_testname != SPANNER_TPCC ): table_size = FLAGS.sysbench_table_size fill_table_size = table_size / num_vms start_index = fill_table_size * vm_index + 1 data_load_cmd_tokens.extend([ '--start_index=%d' % start_index, '--fill_table_size=%d' % fill_table_size, '--create_secondary=false', '--create_tables=false', ]) return ' '.join( data_load_cmd_tokens + _GetCommonSysbenchOptions(db) + ['prepare'] ) def _LoadDatabase( command: str, vm: virtual_machine.BaseVirtualMachine ) -> tuple[str, str]: stdout, stderr = vm.RobustRemoteCommand(command) for output in (stdout, stderr): if 'FATAL' in output: raise errors.Benchmarks.RunError( f'Error while running prepare command: {command}\n{output}' ) return stdout, stderr def _LoadDatabaseInParallel( db: relational_db.BaseRelationalDb, client_vms: list[virtual_machine.VirtualMachine], ) -> list[sample.Sample]: """Loads the database using the sysbench prepare command.""" if _LOAD_CLIENTS.value: client_vms = client_vms[: _LOAD_CLIENTS.value] db.UpdateCapacityForLoad() if ( FLAGS.sysbench_testname != SPANNER_TPCC and db.engine_type == sql_engine_utils.SPANNER_POSTGRES ): client_vms[0].RobustRemoteCommand( f'cd ~/sysbench/ && nice -15 sysbench {FLAGS.sysbench_testname}' f' --tables={FLAGS.sysbench_tables} --table_size=0 ' ' --threads=20 --auto-inc=off ' '--create_secondary=false --db-driver=pgsql' ' --pgsql-host=/tmp prepare' ) _UpdateSessions(db, _LOAD_THREADS.value) # Provision the Sysbench test based on the input flags (load data into DB) # Could take a long time if the data to be loaded is large. data_load_start_time = time.time() # Sysbench output is in stdout, but we also get stderr just in case # something went wrong. command_vm_pairs = [ (_GetSysbenchPrepareCommand(db, len(client_vms), index), vm) for index, vm in enumerate(client_vms) ] args = [(command_vm_pair, {}) for command_vm_pair in command_vm_pairs] results = background_tasks.RunThreaded(_LoadDatabase, args) load_duration = time.time() - data_load_start_time logging.info( 'It took %d seconds to finish the data loading step', load_duration ) for _, stderr in results: if 'FATAL' in stderr: raise errors.Benchmarks.RunError('Error while running prepare phase') if ( FLAGS.sysbench_testname != SPANNER_TPCC and db.engine_type == sql_engine_utils.SPANNER_POSTGRES ): # This command update the secondary index # Run all index update in parallel. client_vms[0].RobustRemoteCommand( 'cd ~/sysbench/ && nice -15 sysbench oltp_read_only' f' --tables={FLAGS.sysbench_tables} --table_size=0 ' f' --threads={FLAGS.sysbench_tables} --auto-inc=off ' '--create_secondary=true --db-driver=pgsql' ' --pgsql-host=/tmp prepare' ) db.UpdateCapacityForRun() return [ sample.Sample( 'sysbench data load time', load_duration, SECONDS_UNIT, CreateMetadataFromFlags(), ) ] def _PrepareClients( db: relational_db.BaseRelationalDb, client_vms: list[virtual_machine.VirtualMachine], ) -> None: """Installs the relevant packages on the clients.""" # Setup common test tools required on the client VM # Run app install to force reinstalling sysbench. spanner_oltp = ( db.engine_type == sql_engine_utils.SPANNER_POSTGRES and FLAGS.sysbench_testname != SPANNER_TPCC ) if spanner_oltp: background_tasks.RunThreaded( lambda vm: sysbench.AptInstall( vm, spanner_oltp=spanner_oltp, ), client_vms, ) else: background_tasks.RunThreaded( lambda vm: vm.Install('sysbench'), client_vms, ) background_tasks.RunThreaded(_InstallLuaScriptsIfNecessary, client_vms) if ( db.engine_type == sql_engine_utils.SPANNER_POSTGRES and FLAGS.sysbench_testname != SPANNER_TPCC ): background_tasks.RunThreaded( lambda client_query_tools: client_query_tools.InstallPackages(), db.client_vms_query_tools, ) # Some databases install these query tools during _PostCreate, which is # skipped if the database is user managed / restored. if db.user_managed or db.restored: background_tasks.RunThreaded( lambda client_query_tools: client_query_tools.InstallPackages(), db.client_vms_query_tools, ) def _PrepareDatabase(db: relational_db.BaseRelationalDb) -> None: """Creates the actual database used for the test, sbtest by default.""" db_name = _GetDatabaseName(db) # Recreate the DB if needed. Not applicable on a fresh run, but helps with # manual development. try: db.DeleteDatabase(db_name) except ( errors.VirtualMachine.RemoteCommandError, errors.VmUtil.IssueCommandError, ): logging.warning('Error dropping database, it may not exist.') stdout, stderr = db.CreateDatabase(db_name) logging.info( '%s db created, stdout is %s, stderr is %s', db_name, stdout, stderr ) def Prepare(benchmark_spec) -> List[sample.Sample]: """Prepares the DB instance and configures it. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: A list of load samples. """ client_vms = benchmark_spec.vm_groups['clients'] db: relational_db.BaseRelationalDb = benchmark_spec.relational_db _PrepareClients(db, client_vms) if _SKIP_LOAD_STAGE.value or db.restored: logging.info('Skipping the load stage') return [] _PrepareDatabase(db) load_samples = _LoadDatabaseInParallel(db, client_vms) if _SLEEP_SEC.value: logging.info( 'Sleeping for %d seconds now that loading has finished.', _SLEEP_SEC.value, ) return load_samples def _GetDatabaseSize(db): """Get the size of the database in MB.""" db_engine_type = db.engine_type if db_engine_type == sql_engine_utils.MYSQL: stdout, _ = db.client_vm_query_tools.IssueSqlCommand( "SELECT table_schema AS 'Database', " 'ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) ' "AS 'Size (MB)' " 'FROM information_schema.TABLES ' 'GROUP BY table_schema; ' ) logging.info('Query database size results: \n%s', stdout) # example stdout is tab delimited but shown here with spaces: # Database Size (MB) # information_schema 0.16 # mysql 5.53 # performance_schema 0.00 # sbtest 0.33 size_mb = 0 for line in stdout.splitlines()[1:]: _, word_size_mb = line.split() size_mb += float(word_size_mb) elif db_engine_type == sql_engine_utils.POSTGRES: stdout, _ = db.client_vm_query_tools.IssueSqlCommand( r'SELECT pg_database_size(' "'sbtest'" ')/1024/1024' ) size_mb = int(stdout.split()[2]) # Spanner doesn't yet support pg_database_size. # See https://cloud.google.com/spanner/quotas#instance_limits. Spanner # supports 4TB per node, so use that number for now. elif db_engine_type == sql_engine_utils.SPANNER_POSTGRES: size_mb = 4096000 * db.nodes else: raise errors.Benchmarks.RunError( 'Unsupported engine type, please update' ' sysbench_benchmark._GetDatabaseSize.' ) return size_mb def _GetSysbenchRunCommand( duration: int, db: relational_db.BaseRelationalDb, sysbench_thread_count: int, ): """Returns the sysbench command as a string.""" if duration <= 0: raise ValueError('Duration must be greater than zero.') run_cmd_tokens = [ 'nice', # run with a niceness of lower priority '-15', # to encourage cpu time for ssh commands 'sysbench', _GetSysbenchTestParameter(), '--tables=%d' % FLAGS.sysbench_tables, ( '--table_size=%d' % FLAGS.sysbench_table_size if _IsValidFlag('table_size') else '' ), ('--scale=%d' % FLAGS.sysbench_scale if _IsValidFlag('scale') else ''), '--rand-type=%s' % UNIFORM, '--threads=%d' % sysbench_thread_count, '--percentile=%d' % FLAGS.sysbench_latency_percentile, '--report-interval=%d' % FLAGS.sysbench_report_interval, '--max-requests=0', '--time=%d' % duration, ] if _GetSysbenchTestParameter() == 'tpcc': run_cmd_tokens.append('--trx_level=%s' % _TXN_ISOLATION_LEVEL.value) run_cmd = ' '.join(run_cmd_tokens + _GetCommonSysbenchOptions(db) + ['run']) run_cmd = 'cd ~/sysbench/ && ' + run_cmd return run_cmd def _IssueSysbenchCommand(vm, duration, benchmark_spec, sysbench_thread_count): """Issues a sysbench run command given a vm and a duration. Does nothing if duration is <= 0 Args: vm: The test VM to issue command to. duration: the duration of the sysbench run. benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. sysbench_thread_count: count of number of threads to use in --threads parameter to sysbench. Returns: stdout, stderr: the result of the command. """ stdout = '' stderr = '' if duration > 0: run_cmd = _GetSysbenchRunCommand( duration, benchmark_spec.relational_db, sysbench_thread_count ) stdout, stderr = vm.RobustRemoteCommand(run_cmd, timeout=duration + 60) logging.info( 'Sysbench results: \n stdout is:\n%s\nstderr is\n%s', stdout, stderr ) return stdout, stderr def _UpdateSessions( db: relational_db.BaseRelationalDb, thread_count: int ) -> None: """Updates the sessions used for more connection parallelism.""" if ( db.client_vms_query_tools[0].ENGINE_TYPE == sql_engine_utils.SPANNER_POSTGRES ): background_tasks.RunThreaded( lambda client: client.Connect(thread_count * 2), db.client_vms_query_tools, ) def _RunSysbench(vms, metadata, benchmark_spec, sysbench_thread_count): """Runs the Sysbench OLTP test. Args: vms: The VMs that will issue the sysbench test. metadata: The PKB metadata to be passed along to the final results. benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. sysbench_thread_count: The number of client threads that will connect. Returns: Results: A list of results of this run. """ # Now run the sysbench OLTP test and parse the results. # First step is to run the test long enough to cover the warmup period # as requested by the caller. Second step is the 'real' run where the results # are parsed and reported. _UpdateSessions(benchmark_spec.relational_db, sysbench_thread_count) vm = vms[0] warmup_seconds = FLAGS.sysbench_warmup_seconds if warmup_seconds > 0: logging.info('Sysbench warm-up run, duration is %d', warmup_seconds) _IssueSysbenchCommand( vm, warmup_seconds, benchmark_spec, sysbench_thread_count ) run_seconds = FLAGS.sysbench_run_seconds logging.info('Sysbench real run, duration is %d', run_seconds) if _SCALEUP_CLIENTS_TEST.value: return _RunScaleUpClientsBenchmark( vms, run_seconds, benchmark_spec, sysbench_thread_count, metadata ) stdout, _ = _IssueSysbenchCommand( vm, run_seconds, benchmark_spec, sysbench_thread_count ) logging.info('\n Parsing Sysbench Results...\n') return ( sysbench.ParseSysbenchTimeSeries(stdout, metadata) + sysbench.ParseSysbenchLatency([stdout], metadata) + sysbench.ParseSysbenchTransactions(stdout, metadata) ) def _RunScaleUpClientsBenchmark( vms, run_seconds, benchmark_spec, sysbench_thread_count, metadata ): """Runs the Scale Up Clients benchmark. Only TPS and QPS is supported.""" scale_up_samples = [] for i in range(1, _SCALEUP_CLIENTS_TEST_NUM_CLIENTS.value + 1): new_metadata = metadata.copy() new_metadata['sysbench_scale_up_client_count'] = i command_vm_pairs = [ (vm, run_seconds, benchmark_spec, sysbench_thread_count) for vm in vms[:i] ] args = [(command_vm_pair, {}) for command_vm_pair in command_vm_pairs] results = background_tasks.RunThreaded(_IssueSysbenchCommand, args) stdouts = [i[0] for i in results] cpu_utilization = 0 if hasattr(benchmark_spec.relational_db, 'GetAverageCpuUsage'): cpu_utilization = benchmark_spec.relational_db.GetAverageCpuUsage( _RUN_DURATION.value // 60, datetime.datetime.now() ) new_metadata['cpu_utilization'] = cpu_utilization total_tps = [] total_qps = [] for stdout in stdouts: current_transactions = sysbench.ParseSysbenchTransactions( stdout, new_metadata ) total_tps.append(current_transactions[0].value) total_qps.append(current_transactions[1].value) logging.info( 'num_clients: %d total_tps: %d total_qps: %d', i, total_tps, total_qps ) tps_metadata = new_metadata.copy() tps_metadata.update({'tps': total_tps}) qps_metadata = new_metadata.copy() qps_metadata.update({'qps': total_qps}) scale_up_samples += [ sample.Sample('total_tps', sum(total_tps), 'tps', tps_metadata), sample.Sample('total_qps', sum(total_qps), 'qps', qps_metadata), ] + sysbench.ParseSysbenchLatency(stdouts, new_metadata) if cpu_utilization > _SCALE_UP_MAX_CPU_UTILIZATION.value: logging.info('cpu_utilization is over the threadshold, stopping') break return scale_up_samples def Run(benchmark_spec): """Run the sysbench benchmark and publish results. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: Results. """ logging.info('Start benchmarking, Cloud Provider is %s.', FLAGS.cloud) results = [] client_vms = benchmark_spec.vm_groups['clients'] db = benchmark_spec.relational_db for thread_count in FLAGS.sysbench_run_threads: metadata = CreateMetadataFromFlags() metadata.update(db.GetResourceMetadata()) metadata['sysbench_db_size_MB'] = _GetDatabaseSize(db) metadata['sysbench_thread_count'] = thread_count # The run phase is common across providers. The VMs[0] object contains all # information and states necessary to carry out the run. results += _RunSysbench(client_vms, metadata, benchmark_spec, thread_count) return results def Cleanup(benchmark_spec): """Clean up benchmark related states on server and client. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ del benchmark_spec