perfkitbenchmarker/linux_packages/mcperf.py (250 lines of code) (raw):

# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing mcperf installation and cleanup functions.""" import logging from absl import flags from perfkitbenchmarker import errors from perfkitbenchmarker import linux_packages from perfkitbenchmarker import regex_util from perfkitbenchmarker import sample GIT_REPO = 'https://github.com/shaygalon/memcache-perf.git' MCPERF_DIR = '%s/mcperf_benchmark' % linux_packages.INSTALL_DIR MCPERF_BIN = '%s/mcperf' % MCPERF_DIR APT_PACKAGES = 'scons libevent-dev gengetopt libzmq3-dev' FLAGS = flags.FLAGS flags.DEFINE_enum( 'mcperf_protocol', 'binary', ['binary', 'ascii'], 'Protocol to use. Supported protocols are binary and ascii.', ) flags.DEFINE_list( 'mcperf_qps', [], 'Target aggregate QPS. If not set, target for peak qps.' ) flags.DEFINE_integer('mcperf_time', 300, 'Maximum time to run (seconds).') flags.DEFINE_string( 'mcperf_keysize', '16', 'Length of memcached keys (distribution).' ) flags.DEFINE_string( 'mcperf_valuesize', '128', 'Length of memcached values (distribution).' ) flags.DEFINE_integer( 'mcperf_records', 10000, 'Number of memcached records to use.' ) flags.DEFINE_float( 'mcperf_ratio', 0.0, 'Ratio of set:get. By default, read only.' ) flags.DEFINE_list( 'mcperf_options', ['iadist=exponential:0.0'], 'Additional mcperf long-form options (--) in comma separated form. e.g.' '--mcperf_options=blocking,search=99:1000.' 'See https://github.com/shaygalon/memcache-perf for all available options.', ) # If more than one value provided for threads, connections, depths, we will # enumerate all test configurations. e.g. # threads=1,2; connections=3,4; depths=5,6 # We will test following threads:connections:depths: # 1,3,5; 1,3,6; 1,4,5; 1,4,6; 2,3,5; 2,3,6; 2,4,5; 2,4,6; flags.DEFINE_list( 'mcperf_threads', [1], 'Number of total client threads to spawn per client VM.', ) flags.DEFINE_list( 'mcperf_connections', [1], 'Number of connections to establish per client thread.', ) flags.DEFINE_list('mcperf_depths', [1], 'Maximum depth to pipeline requests.') # Agent mode options. flags.DEFINE_integer( 'mcperf_measure_connections', None, 'Master client connections.' ) flags.DEFINE_integer( 'mcperf_measure_threads', None, 'Master client thread count.' ) flags.DEFINE_integer('mcperf_measure_qps', None, 'Master client QPS.') flags.DEFINE_integer( 'mcperf_measure_depth', None, 'Master client connection depth.' ) _INCREMENTAL_LOAD = flags.DEFINE_float( 'mcperf_incremental_load', None, 'Increments target qps until hits peak.' ) # To use remote agent mode, we need at least 2 VMs. AGENT_MODE_MIN_CLIENT_VMS = 2 def CheckPrerequisites(): """Verify flags are correctly specified. Raises: errors.Setup.InvalidFlagConfigurationError: On invalid flag configurations. """ agent_mode_flags = [ FLAGS['mcperf_measure_connections'].present, FLAGS['mcperf_measure_threads'].present, FLAGS['mcperf_measure_qps'].present, FLAGS['mcperf_measure_depth'].present, ] error_message = ( 'To enable agent mode, set memcached_mcperf_num_client_vms > 1.' ) if any(agent_mode_flags) and ( FLAGS.memcached_mcperf_num_client_vms < AGENT_MODE_MIN_CLIENT_VMS ): raise errors.Setup.InvalidFlagConfigurationError(error_message) if _INCREMENTAL_LOAD.value and ( len(FLAGS.mcperf_qps) != 1 or int(FLAGS.mcperf_qps[0]) == 0 ): raise errors.Setup.InvalidFlagConfigurationError( 'To use dynamic load, set inital target qps with --mcperf_qps ' 'and incremental with --mcperf_incremental_load.' ) def YumInstall(vm): """Installs the mcperf package on the VM.""" raise NotImplementedError def AptInstall(vm): """Installs the mcperf package on the VM.""" vm.Install('build_tools') vm.InstallPackages(APT_PACKAGES) vm.RemoteCommand('git clone {} {}'.format(GIT_REPO, MCPERF_DIR)) vm.RemoteCommand('cd {} && sudo scons'.format(MCPERF_DIR)) def GetMetadata(): """Returns mcperf metadata.""" metadata = { 'protocol': FLAGS.mcperf_protocol, 'qps': FLAGS.mcperf_qps or 'peak', 'time': FLAGS.mcperf_time, 'keysize': FLAGS.mcperf_keysize, 'valuesize': FLAGS.mcperf_valuesize, 'records': FLAGS.mcperf_records, 'ratio': FLAGS.mcperf_ratio, } if FLAGS.mcperf_options: metadata['options'] = FLAGS.mcperf_options return metadata def BuildCmd(server_ip, server_port, num_instances, options): """Build base mcperf command in a list.""" server_ips = [] for idx in range(num_instances): server_ips.append(f'--server={server_ip}:{server_port + idx}') cmd = ( [ 'ulimit -n 32768; ', MCPERF_BIN, '--keysize=%s' % FLAGS.mcperf_keysize, '--valuesize=%s' % FLAGS.mcperf_valuesize, '--records=%s' % FLAGS.mcperf_records, '--roundrobin' if len(server_ips) > 1 else '', ] + server_ips + options ) if FLAGS.mcperf_protocol == 'binary': cmd.append('--binary') return cmd def Load(client_vm, server_ip, server_port): """Preload the server with data.""" logging.info('Loading memcached server.') cmd = BuildCmd(server_ip, server_port, 1, ['--loadonly']) client_vm.RemoteCommand(' '.join(cmd)) def RestartAgent(vm, threads): logging.info('Restarting mcperf remote agent on %s', vm.internal_ip) # Kill existing mcperf agent threads vm.RemoteCommand('pkill -9 mcperf', ignore_failure=True) # Make sure have enough file descriptor for the agent process. vm.RemoteCommand( ' '.join([ 'ulimit -n 32768; ', 'nohup', MCPERF_BIN, '--threads=%s' % threads, '--agentmode', '&> log', '&', ]) ) def Run(vms, server_ip, server_port, num_instances): """Runs the mcperf benchmark on the vm.""" samples = [] master = vms[0] runtime_options = {} samples = [] measure_flags = [] additional_flags = ['--%s' % option for option in FLAGS.mcperf_options] if FLAGS.mcperf_measure_connections: runtime_options['measure_connections'] = FLAGS.mcperf_measure_connections measure_flags.append( '--measure_connections=%s' % FLAGS.mcperf_measure_connections ) if FLAGS.mcperf_measure_threads: runtime_options['measure_threads'] = FLAGS.mcperf_measure_threads if FLAGS.mcperf_measure_qps: runtime_options['measure_qps'] = FLAGS.mcperf_measure_qps measure_flags.append('--measure_qps=%s' % FLAGS.mcperf_measure_qps) if FLAGS.mcperf_measure_depth: runtime_options['measure_depth'] = FLAGS.mcperf_measure_depth measure_flags.append('--measure_depth=%s' % FLAGS.mcperf_measure_depth) for thread_count in FLAGS.mcperf_threads: runtime_options['threads'] = thread_count for vm in vms[1:]: RestartAgent(vm, thread_count) for connection_count in FLAGS.mcperf_connections: runtime_options['connections'] = connection_count for depth in FLAGS.mcperf_depths: runtime_options['depth'] = depth target_qps_list = FLAGS.mcperf_qps[:] or [0] while True: target_qps = int(target_qps_list[0]) runtime_options['qps'] = target_qps or 'peak' remote_agents = ['--agent=%s' % vm.internal_ip for vm in vms[1:]] cmd = BuildCmd( server_ip, server_port, num_instances, [ '--noload', '--qps=%s' % target_qps, '--time=%s' % FLAGS.mcperf_time, '--update=%s' % FLAGS.mcperf_ratio, '--threads=%s' % (FLAGS.mcperf_measure_threads or thread_count), '--connections=%s' % connection_count, '--depth=%s' % depth, ] + remote_agents + measure_flags + additional_flags, ) try: stdout, _, retcode = master.RemoteHostCommandWithReturnCode( ' '.join(cmd), timeout=FLAGS.mcperf_time * 2, ignore_failure=True, ) except errors.VmUtil.IssueCommandTimeoutError: break if retcode: break metadata = GetMetadata() metadata.update(runtime_options) run_samples, actual_qps = ParseResults(stdout, metadata) samples.extend(run_samples) if _INCREMENTAL_LOAD.value and ( actual_qps / target_qps > (1 - _INCREMENTAL_LOAD.value * 2) ): target_qps_list.append( int(target_qps) * (1 + _INCREMENTAL_LOAD.value) ) target_qps_list.pop(0) if not target_qps_list: break return samples LATENCY_HEADER_REGEX = r'#type([\s\w\d]*)\n' LATENCY_REGEX = r'([\s\d\.]*)' QPS_REGEX = r'Total QPS = ([\d\.]*)' MISS_REGEX = r'Misses = \d+ \(([\d\.]*)%\)' BANDWIDTH_REGEX = r'[\s\d]*bytes :\s*([\d\.]*) MB/s' def ParseResults(result, metadata): """Parse mcperf result into samples. Sample Output: #type avg std min p5 p10 p50 p67 read 106.0 67.7 37.2 80.0 84.3 101.7 108.8 update 0.0 0.0 0.0 0.0 0.0 0.0 0.0 op_q 10.0 0.0 1.0 9.4 9.4 9.7 9.8 Total QPS = 754451.6 (45267112 / 60.0s) Total connections = 8 Misses = 0 (0.0%) Skipped TXs = 0 (0.0%) RX 11180976417 bytes : 177.7 MB/s TX 0 bytes : 0.0 MB/s CPU Usage Stats (avg/min/max): 31.85%,30.31%,32.77% Args: result: Text output of running mcperf benchmark. metadata: metadata associated with the results. Returns: List of sample.Sample objects and actual qps. """ samples = [] if FLAGS.mcperf_ratio < 1.0: # N/A for write only workloads. misses = regex_util.ExtractGroup(MISS_REGEX, result) metadata['miss_rate'] = float(misses) latency_stats = regex_util.ExtractGroup(LATENCY_HEADER_REGEX, result).split() # parse latency for metric in ('read', 'update', 'op_q'): latency_regex = metric + LATENCY_REGEX latency_values = regex_util.ExtractGroup(latency_regex, result).split() for idx, stat in enumerate(latency_stats): if idx == len(latency_values): logging.warning( 'Mutilate does not report %s latency for %s.', stat, metric ) break samples.append( sample.Sample( metric + '_' + stat, float(latency_values[idx]), 'usec', metadata ) ) # parse bandwidth for metric in ('TX', 'RX'): bw_regex = metric + BANDWIDTH_REGEX bw = regex_util.ExtractGroup(bw_regex, result) samples.append(sample.Sample(metric, float(bw), 'MB/s', metadata)) qps = regex_util.ExtractFloat(QPS_REGEX, result) samples.append(sample.Sample('qps', qps, 'ops/s', metadata)) return samples, qps