perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py (743 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Runs cassandra-stress on a cluster of Cassandra servers. Cassandra is a distributed, open source, NoSQL database management system Cassandra homepage: http://cassandra.apache.org cassandra-stress tool page: http://docs.datastax.com/en/cassandra/2.1/cassandra/tools/toolsCStress_t.html We are using cassandra-stress to test the performance of the cassandra cluster for different workloads at varied load. The benchmark is run in two phases: 1. Preload phase: The cassandra cluster is preloaded with a set of keys. 2. Run phase: The cassandra cluster is run with a specified workload. Workload can be of type read only, write only or mixed read and write. The benchmark can be configured to run the test multiple times by using --optimize_performance flag. We update the load for each test by updating the number of threads to get max op rate. """ import collections import copy import functools import logging import math import posixpath import re import time from typing import Union from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import data from perfkitbenchmarker import errors from perfkitbenchmarker import regex_util from perfkitbenchmarker import sample from perfkitbenchmarker import units from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import cassandra NUM_KEYS_PER_CORE = 2000000 # Adding wait between prefill and the test workload to give some time # for the data to propagate and for the cluster to stabilize. PROPAGATION_WAIT_TIME = 720 WAIT_BETWEEN_COMPACTION_TASKS_CHECK = 720 # cassandra-stress command WRITE_COMMAND = 'write' COUNTER_WRITE_COMMAND = 'counter_write' USER_COMMAND = 'user' READ_COMMAND = 'read' COUNTER_READ_COMMAND = 'counter_read' MIXED_COMMAND = 'mixed' PRELOAD_REQUIRED = (READ_COMMAND, COUNTER_READ_COMMAND, MIXED_COMMAND) # cassandra-stress command [options] flags.DEFINE_enum( 'cassandra_stress_command', WRITE_COMMAND, [ WRITE_COMMAND, COUNTER_WRITE_COMMAND, USER_COMMAND, READ_COMMAND, COUNTER_READ_COMMAND, MIXED_COMMAND, ], 'cassandra-stress command to use.', ) flags.DEFINE_integer( 'cassandra_stress_preload_num_keys', None, 'Number of keys to preload into cassandra database. ' 'Read/counter_read/mixed modes require preloading ' 'cassandra database. If not set, the number of the keys ' 'preloaded will be the same as --num_keys for ' 'read/counter_read/mixed mode, the same as the number of ' 'loaders for write/counter_write/user mode.', ) # Options for cassandra-stress CASSANDRA_STRESS_NUM_KEYS = flags.DEFINE_integer( 'cassandra_stress_num_keys', 0, 'Number of keys used in cassandra-stress tool across all loader vms. If' ' unset, this benchmark will use %s * NumCpusForBenchmark() on data nodes' ' as the value. Ignored if --cassandra_stress_run_duration is set.' % NUM_KEYS_PER_CORE, ) flags.DEFINE_integer( 'num_cassandra_stress_threads', 150, 'Number of threads used in cassandra-stress tool on each loader node.', ) flags.DEFINE_integer( 'cassandra_stress_replication_factor', 3, 'Number of replicas.' ) flags.DEFINE_enum( 'cassandra_stress_consistency_level', 'QUORUM', ['ONE', 'QUORUM', 'LOCAL_ONE', 'LOCAL_QUORUM', 'EACH_QUORUM', 'ALL', 'ANY'], 'Set the consistency level to use during cassandra-stress.', ) flags.DEFINE_integer( 'cassandra_stress_retries', 1000, 'Number of retries when error encountered during stress.', ) CASSANDRA_STRESS_PRELOAD_THREADS = flags.DEFINE_integer( 'cassandra_stress_preload_thead_count', 300, 'Number of threads to use for preloading.', ) CASSANDRA_STRESS_RUN_DURATION = flags.DEFINE_string( 'cassandra_stress_run_duration', None, 'Duration of the cassandra-stress. Use m, s and h as units. Overrides' ' --num_keys.', ) IS_ROW_CACHE_ENABLED = flags.DEFINE_bool( 'is_row_cache_enabled', False, 'Enable row cache for the cassandra server.', ) ROW_CACHE_SIZE = flags.DEFINE_integer( 'row_cache_size', 1000, 'Size of the row cache for cassandra in MiB if --is_row_cache_enabled is' ' true.', ) CASSANDRA_SERVER_ZONES = flags.DEFINE_list( 'cassandra_server_zones', [], 'zones to launch the cassandra servers in. ', ) CASSANDRA_CLIENT_ZONES = flags.DEFINE_list( 'cassandra_client_zones', [], 'zones to launch the clients for the benchmark in. ', ) CASSANDRA_CPU_UTILIZATION_LIMIT = flags.DEFINE_integer( 'cassandra_cpu_utilization_limit', 70, 'Maximum cpu utilization percentage for the benchmark. ', ) CASSANDRA_STRESS_MAX_RUNS = flags.DEFINE_integer( 'cassandra_stress_max_runs', 6, 'Max Number of times to run cassandra-stress to optimize performance.', ) # Use "./cassandra-stress help -pop" to get more details. # [dist=DIST(?)]: Seeds are selected from this distribution # EXP(min..max): # An exponential distribution over the range [min..max] # EXTREME(min..max,shape): # An extreme value (Weibull) distribution over the range [min..max] # QEXTREME(min..max,shape,quantas): # An extreme value, split into quantas, within which the chance of # selection is uniform # GAUSSIAN(min..max,stdvrng): # A gaussian/normal distribution, where mean=(min+max)/2, and stdev # is (mean-min)/stdvrng # GAUSSIAN(min..max,mean,stdev): # A gaussian/normal distribution, with explicitly defined mean and stdev # UNIFORM(min..max): # A uniform distribution over the range [min, max] # Preceding the name with ~ will invert the distribution, # e.g. ~EXP(1..10) will yield 10 most, instead of least, often. flags.DEFINE_enum( 'cassandra_stress_population_distribution', None, [ 'EXP', 'EXTREME', 'QEXTREME', 'GAUSSIAN', 'UNIFORM', '~EXP', '~EXTREME', '~QEXTREME', '~GAUSSIAN', '~UNIFORM', ], 'The population distribution cassandra-stress uses. ' 'By default, each loader vm is given a range of keys ' '[min, max], and loaders will read/insert keys sequentially ' 'from min to max.', ) flags.DEFINE_integer( 'cassandra_stress_population_size', None, 'The size of the population across all clients. ' 'By default, the size of the population equals to ' 'max(num_keys,cassandra_stress_preload_num_keys).', ) flags.DEFINE_list( 'cassandra_stress_population_parameters', [], 'Additional parameters to use with distribution. ' 'This benchmark will calculate min, max for each ' 'distribution. Some distributions need more parameters. ' 'See: "./cassandra-stress help -pop" for more details. ' 'Comma-separated list.', ) # Options to use with cassandra-stress mixed mode, below flags only matter if # --cassandra_stress_command=mixed. flags.DEFINE_string( 'cassandra_stress_mixed_ratio', 'write=1,read=1', 'Read/write ratio of cassandra-stress. Only valid if ' '--cassandra_stress_command=mixed. By default, ' '50% read and 50% write.', ) # Options to use with cassandra-stress user mode, below flags only matter if # --cassandra_stress_command=user. # http://www.datastax.com/dev/blog/improved-cassandra-2-1-stress-tool-benchmark-any-schema flags.DEFINE_string( 'cassandra_stress_profile', '', 'Path to cassandra-stress profile file. ' 'Only valid if --cassandra_stress_command=user.', ) flags.DEFINE_string( 'cassandra_stress_operations', 'insert=1', 'Specify what operations (inserts and/or queries) to ' 'run and the ratio of each operation. ' 'Only valid if --cassandra_stress_command=user.', ) FLAGS = flags.FLAGS BENCHMARK_NAME = 'cassandra_stress' BENCHMARK_CONFIG = """ cassandra_stress: description: Benchmark Cassandra using cassandra-stress vm_groups: workers: vm_spec: GCP: machine_type: c3-standard-4 zone: us-central1-a Azure: machine_type: Standard_D4_v5 zone: eastus2 AWS: machine_type: m6i.xlarge zone: us-east-1 disk_spec: *default_500_gb client: vm_spec: GCP: machine_type: c3-standard-4 zone: us-central1-a Azure: machine_type: Standard_D4_v5 zone: eastus2 AWS: machine_type: m6i.xlarge zone: us-east-1 """ CASSANDRA_GROUP = 'workers' CLIENT_GROUP = 'client' SLEEP_BETWEEN_CHECK_IN_SECONDS = 5 TEMP_PROFILE_PATH = posixpath.join(vm_util.VM_TMP_DIR, 'profile.yaml') # Results documentation: # http://docs.datastax.com/en/cassandra/2.1/cassandra/tools/toolsCStressOutput_c.html RESULTS_METRICS = ( 'op rate', # Number of operations per second performed during the run. 'partition rate', # Number of partition operations per second performed # during the run. 'row rate', # Number of row operations per second performed during the run. 'latency mean', # Average latency in milliseconds for each operation during # that run. 'latency median', # Median latency in milliseconds for each operation # during that run. 'latency 95th percentile', # 95% of the time the latency was less than # the number displayed in the column. 'latency 99th percentile', # 99% of the time the latency was less than # the number displayed in the column. 'latency 99.9th percentile', # 99.9% of the time the latency was less than # the number displayed in the column. 'latency max', # Maximum latency in milliseconds. 'total partitions', # Number of partitions. 'total errors', # Number of errors. 'total operation time', ) # Total operation time. # Metrics are aggregated between client vms. AGGREGATED_METRICS = frozenset({ 'op rate', 'partition rate', 'row rate', 'Total partitions', 'Total errors', }) # Maximum value will be chosen between client vms. MAXIMUM_METRICS = {'latency max'} THREAD_INCREMENT_COUNT = 50 MAX_MEDIAN_LATENCY_MS = 20 MAX_ACCEPTED_COMPACTION_TIME = 30 STARTING_THREAD_COUNT = 25 SAR_CPU_UTILIZATION_INTERVAL = 10 class CassandraCompactionNotCompletedError(Exception): """Exception for cassandra compaction not complete.""" def GetConfig(user_config): """Customize the config for the benchmark based on flags.""" cloud = FLAGS.cloud config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if FLAGS.client_vm_machine_type: vm_spec = config['vm_groups'][CLIENT_GROUP]['vm_spec'] for cloud_value in vm_spec: config['vm_groups'][CLIENT_GROUP]['vm_spec'][cloud_value][ 'machine_type' ] = FLAGS.client_vm_machine_type if FLAGS.db_machine_type: vm_spec = config['vm_groups'][CASSANDRA_GROUP]['vm_spec'] for cloud_value in vm_spec: config['vm_groups'][CASSANDRA_GROUP]['vm_spec'][cloud_value][ 'machine_type' ] = FLAGS.db_machine_type if FLAGS.db_disk_type: disk_spec = config['vm_groups'][CASSANDRA_GROUP]['disk_spec'] for cloud_value in disk_spec: config['vm_groups'][CASSANDRA_GROUP]['disk_spec'][cloud_value][ 'disk_type' ] = FLAGS.db_disk_type if FLAGS.db_disk_size: disk_spec = config['vm_groups'][CASSANDRA_GROUP]['disk_spec'] for cloud_value in disk_spec: config['vm_groups'][CASSANDRA_GROUP]['disk_spec'][cloud_value][ 'disk_size' ] = FLAGS.db_disk_size if FLAGS.db_disk_iops: disk_spec = config['vm_groups'][CASSANDRA_GROUP]['disk_spec'] for cloud_value in disk_spec: config['vm_groups'][CASSANDRA_GROUP]['disk_spec'][cloud_value][ 'provisioned_iops' ] = FLAGS.db_disk_iops if FLAGS.db_disk_throughput: disk_spec = config['vm_groups'][CASSANDRA_GROUP]['disk_spec'] for cloud_value in disk_spec: config['vm_groups'][CASSANDRA_GROUP]['disk_spec'][cloud_value][ 'provisioned_throughput' ] = FLAGS.db_disk_throughput ConfigureVmGroups( config, CASSANDRA_SERVER_ZONES.value, CASSANDRA_GROUP, cloud ) ConfigureVmGroups(config, CASSANDRA_CLIENT_ZONES.value, CLIENT_GROUP, cloud) return config def ConfigureVmGroups(config, flag, group_name, cloud): for index, zone in enumerate(flag): if index == 0: config['vm_groups'][group_name]['vm_spec'][cloud]['zone'] = zone continue node = copy.deepcopy(config['vm_groups'][group_name]) node['vm_spec'][cloud]['zone'] = zone config['vm_groups'][f'{group_name}_{index}'] = node def CheckPrerequisites(benchmark_config): """Verifies that the required resources are present. Raises: perfkitbenchmarker.data.ResourceNotFound: On missing resource. """ if not CASSANDRA_SERVER_ZONES.value: raise errors.Setup.InvalidFlagConfigurationError( 'Specify zones for cassandra servers in cassandra_server_zones flag' ) if not CASSANDRA_CLIENT_ZONES.value: raise errors.Setup.InvalidFlagConfigurationError( 'Specify zones for cassandra clients in cassandra_client_zones flag' ) cassandra.CheckPrerequisites() if FLAGS.cassandra_stress_command == USER_COMMAND: data.ResourcePath(FLAGS.cassandra_stress_profile) def CheckMetadata(metadata): """Verify that metadata is valid. Args: metadata: dict. Contains metadata for this benchmark. """ if metadata['command'] in PRELOAD_REQUIRED: if metadata['population_size'] > metadata['num_preload_keys']: raise errors.Benchmarks.PrepareException( 'For %s modes, number of preloaded keys must be larger than or ' 'equal to population size.', PRELOAD_REQUIRED, ) def GenerateMetadataFromFlags(benchmark_spec, cassandra_vms, client_vms): """Generate metadata from command-line flags. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. cassandra_vms: cassandra server vms. client_vms: cassandra client vms. Returns: dict. Contains metadata for this benchmark. """ vm_dict = benchmark_spec.vm_groups metadata = {} if not CASSANDRA_STRESS_NUM_KEYS.value: metadata['num_keys'] = ( NUM_KEYS_PER_CORE * vm_dict[CASSANDRA_GROUP][0].NumCpusForBenchmark() ) else: metadata['num_keys'] = CASSANDRA_STRESS_NUM_KEYS.value if FLAGS['cassandra_stress_preload_num_keys'].present: metadata['num_preload_keys'] = FLAGS.cassandra_stress_preload_num_keys elif FLAGS.cassandra_stress_command in PRELOAD_REQUIRED: metadata['num_preload_keys'] = metadata['num_keys'] else: metadata['num_preload_keys'] = len(vm_dict[CLIENT_GROUP]) metadata.update({ 'concurrent_reads': FLAGS.cassandra_concurrent_reads, 'concurrent_writes': FLAGS.cassandra_concurrent_writes, 'num_data_nodes': len(cassandra_vms), 'num_loader_nodes': len(client_vms), 'num_cassandra_stress_threads': FLAGS.num_cassandra_stress_threads, 'num_cassandra_stress_preload_threads': ( CASSANDRA_STRESS_PRELOAD_THREADS.value ), 'command': FLAGS.cassandra_stress_command, 'consistency_level': FLAGS.cassandra_stress_consistency_level, 'retries': FLAGS.cassandra_stress_retries, 'population_size': FLAGS.cassandra_stress_population_size or max( metadata['num_keys'], metadata['num_preload_keys'] ), 'population_dist': FLAGS.cassandra_stress_population_distribution, 'population_parameters': ','.join( FLAGS.cassandra_stress_population_parameters ), 'is_row_cache_enabled': FLAGS.is_row_cache_enabled, 'row_cache_size': FLAGS.row_cache_size, 'duration': CASSANDRA_STRESS_RUN_DURATION.value, }) if FLAGS.cassandra_stress_command == USER_COMMAND: metadata.update({ 'profile': FLAGS.cassandra_stress_profile, 'operations': FLAGS.cassandra_stress_operations, }) else: if FLAGS.cassandra_stress_command == MIXED_COMMAND: metadata['mixed_ratio'] = FLAGS.cassandra_stress_mixed_ratio metadata['replication_factor'] = FLAGS.cassandra_stress_replication_factor logging.info('Metadata: %s', metadata) return metadata def PreloadCassandraServer(cassandra_vms, client_vms, metadata): """Preload cassandra cluster if necessary. Args: cassandra_vms: cassandra server vms. client_vms: client vms that run cassandra-stress. metadata: dict. Contains metadata for this benchmark. """ if ( FLAGS.cassandra_stress_command == 'read' or FLAGS.cassandra_stress_command == 'mixed' ): cassandra_stress_command = 'write' elif FLAGS.cassandra_stress_command == 'counter_read': cassandra_stress_command = 'counter_write' else: cassandra_stress_command = FLAGS.cassandra_stress_command logging.info( 'Preloading cassandra database with %s %s operations.', metadata['num_preload_keys'], cassandra_stress_command, ) RunCassandraStressTest( cassandra_vms, client_vms, metadata['num_preload_keys'], cassandra_stress_command, CASSANDRA_STRESS_PRELOAD_THREADS.value, is_preload=True, ) logging.info('Waiting %s for keyspace to propagate.', PROPAGATION_WAIT_TIME) time.sleep(PROPAGATION_WAIT_TIME) def ParseVmGroups(vm_dict): cassandra_vms = [] client_vms = [] for key, value in vm_dict.items(): if key.startswith(CASSANDRA_GROUP): cassandra_vms.append(value[0]) elif key.startswith(CLIENT_GROUP): client_vms.append(value[0]) return cassandra_vms, client_vms def Prepare(benchmark_spec): """Install Cassandra and Java on target vms. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ vm_dict = benchmark_spec.vm_groups cassandra_vms, client_vms = ParseVmGroups(vm_dict) logging.info('VM dictionary %s', vm_dict) logging.info('Authorizing loader[0] permission to access all other vms.') client_vms[0].AuthenticateVm() logging.info('Preparing data files and Java on all vms.') background_tasks.RunThreaded( lambda vm: vm.Install('cassandra'), cassandra_vms ) background_tasks.RunThreaded( lambda vm: vm.Install('cassandra_stress'), client_vms ) # needed to run sar background_tasks.RunThreaded( lambda vm: vm.InstallPackages('sysstat'), cassandra_vms + client_vms ) seed_vm = cassandra_vms[0] configure = functools.partial(cassandra.Configure, seed_vms=[seed_vm]) background_tasks.RunThreaded(configure, cassandra_vms) cassandra.StartCluster(seed_vm, cassandra_vms[1:]) if FLAGS.cassandra_stress_command == USER_COMMAND: for vm in client_vms: vm.PushFile(FLAGS.cassandra_stress_profile, TEMP_PROFILE_PATH) metadata = GenerateMetadataFromFlags( benchmark_spec, cassandra_vms, client_vms ) if metadata['num_preload_keys']: CheckMetadata(metadata) cassandra.CreateKeyspace( seed_vm, replication_factor=FLAGS.cassandra_replication_factor ) PreloadCassandraServer(cassandra_vms, client_vms, metadata) WaitForCompactionTasks(cassandra_vms) def _ResultFilePath(vm): return posixpath.join(vm_util.VM_TMP_DIR, vm.hostname + '.stress_results.txt') def RunTestOnLoader( vm, loader_index, operations_per_vm, data_node_ips, command, user_operations, population_per_vm, population_dist, population_params, thread_count, is_preload, ): """Run Cassandra-stress test on loader node. Args: vm: The target vm. loader_index: integer. The index of target vm in loader vms. operations_per_vm: integer. The number of operations each loader vm requests. data_node_ips: list. List of IP addresses for all data nodes. command: string. The cassandra-stress command to use. user_operations: string. The operations to use with user mode. population_per_vm: integer. Population per loader vm. population_dist: string. The population distribution. population_params: string. Representing additional population parameters. thread_count: integer. The number of threads to use for cassandra-stress. is_preload: boolean. Whether this is a preload run. """ if command == USER_COMMAND: command += r' profile={profile} ops\({ops}\)'.format( profile=TEMP_PROFILE_PATH, ops=user_operations ) schema_option = '' else: if command == MIXED_COMMAND: command += r' ratio\({ratio}\)'.format( ratio=FLAGS.cassandra_stress_mixed_ratio ) # TODO: Support more complex replication strategy. schema_option = ( r'-schema replication\(factor={replication_factor}\)'.format( replication_factor=FLAGS.cassandra_stress_replication_factor ) ) population_range = '%s..%s' % ( loader_index * population_per_vm + 1, (loader_index + 1) * population_per_vm, ) if population_params: population_params = '%s,%s' % (population_range, population_params) else: population_params = population_range if population_dist: population_dist = r'-pop dist=%s\(%s\)' % ( population_dist, population_params, ) else: population_dist = '-pop seq=%s' % population_params duration = '' num_keys_parameter = 'n={num_keys}'.format(num_keys=operations_per_vm) # Duration specificies how long to run the test. If it is set, we don't need # to specify the number of keys. if not is_preload and CASSANDRA_STRESS_RUN_DURATION.value: duration = r'duration={duration}'.format( duration=CASSANDRA_STRESS_RUN_DURATION.value ) num_keys_parameter = '' vm.RobustRemoteCommand( 'sudo {cassandra_stress_command} {command} {duration}' ' cl={consistency_level} {num_keys_parameter} -node {nodes} {schema}' ' {population_dist} -log file={result_file} -rate threads={threads}' ' -errors retries={retries}'.format( cassandra_stress_command=cassandra.GetCassandraStressPath(vm), command=command, consistency_level=FLAGS.cassandra_stress_consistency_level, num_keys_parameter=num_keys_parameter, nodes=','.join(data_node_ips), schema=schema_option, population_dist=population_dist, result_file=_ResultFilePath(vm), retries=FLAGS.cassandra_stress_retries, threads=int(thread_count), duration=duration, ) ) def RunCassandraStressTest( cassandra_vms, loader_vms, num_operations, command, thread_count, profile_operations='insert=1', population_size=None, population_dist=None, population_params=None, is_preload=False, ): """Start all loader nodes as Cassandra clients and run stress test. Args: cassandra_vms: list. A list of vm objects. Cassandra servers. load_vms: list. A list of vm objects. Cassandra clients. num_keys: integer. The number of operations cassandra-stress clients should issue. command: string. The cassandra-stress command to use. profile_operations: string. The operations to use with user mode. population_size: integer. The population size. population_dist: string. The population distribution. population_params: string. Representing additional population parameters. thread_count: integer. The number of threads to use for cassandra-stress. is_preload: boolean. Whether this is a preload run. Returns: A list of cpu load for each cassandra node. """ num_loaders = len(loader_vms) data_node_ips = [vm.internal_ip for vm in cassandra_vms] population_size = population_size or num_operations operations_per_vm = int(math.ceil(float(num_operations) / num_loaders)) population_per_vm = int(population_size / num_loaders) if num_operations % num_loaders: logging.warn( 'Total number of operations rounded to %s ' '(%s operations per loader vm).', operations_per_vm * num_loaders, operations_per_vm, ) logging.info('Executing the benchmark.') tasks = [] for i in range(0, len(loader_vms)): tasks.append(( RunTestOnLoader, [ loader_vms[i], i, operations_per_vm, data_node_ips, command, profile_operations, population_per_vm, population_dist, population_params, thread_count, is_preload, ], {}, )) if not is_preload: for vm in cassandra_vms: tasks.append((CPUUtilizationReporting, [vm], {})) background_tasks.RunParallelThreads(tasks, max_concurrency=10) return background_tasks.RunThreaded(GetLoadAverage, cassandra_vms) def CPUUtilizationReporting(vm): # command : sar -u <interval> <count> # we are collection the data every SAR_CPU_UTILIZATION_INTERVAL seconds for # the duration of the test vm.RobustRemoteCommand( f'sar -u {SAR_CPU_UTILIZATION_INTERVAL}' f' {CalculateNumberOfSarRequestsFromDuration(CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL)}' f' > {GenerateCpuUtilizationFileName(vm)}' ) def ParseAverageCpuUtilization(output) -> float: """Parses the output of the sar command.""" average_cpu_utilization = re.findall( r'^Average.*$', output, flags=re.MULTILINE ) if not average_cpu_utilization: logging.error('No average cpu utilization found in sar output.') return 0 per_process_cpu_utilization = re.sub( ' +', ' ', average_cpu_utilization[0] ).split(' ') return float(per_process_cpu_utilization[2].strip()) def CalculateNumberOfSarRequestsFromDuration(duration, freq): """Calculates the number of sar requests to be sent from the duration of the test.""" if duration is None: # If duration is not set, we don't need to send sar requests. return 0 duration = duration.replace('m', 'min') # In units, m is meter quantity = units.ParseExpression(duration) seconds = quantity.m_as(units.second) return int(seconds / freq) def GetLoadAverage(vm): stdout, _ = vm.RemoteCommand('uptime') load_last_5m = ParseUptimeOutput(stdout) return load_last_5m def ParseResp(resp) -> dict[str, Union[float, int]]: """Parses response from Cassandra stress test. Args: resp: metric data to parse Returns: dict of all the metrics and their values """ all_rows = resp.split('\n') metric_values = {} for row in all_rows: if row.strip() == 'Results:' or not row.strip(): continue metric_details = _ParseResultRow(row) if metric_details: metric_name, metric_value = metric_details metric_values[metric_name] = metric_value return metric_values def _ParseResultRow(row: str) -> tuple[str, float] | None: """Parses a single row of the result file.""" try: metric_name = regex_util.ExtractGroup('(.*) :', row, 1).lower().strip() except regex_util.NoMatchError: logging.error('Metric name not found in row: %s', row) return None if metric_name not in RESULTS_METRICS: return None try: metric_data = regex_util.ExtractGroup( r'(.*) :\s*(\d{1,3}(,\d{3})*(\.\d+)*)', row, 2 ).strip() except regex_util.NoMatchError: logging.error('Invalid value for %s: %s', metric_name, row) return None if metric_name == 'total operation time': operation_time_values = regex_util.ExtractGroup( r'(.*) :\s*(\d{2}:\d{2}:\d{2})', row, 2 ).split(':') metric_value = ( int(operation_time_values[0].strip()) * 3600 + int(operation_time_values[1].strip()) * 60 + int(operation_time_values[2].strip()) ) else: metric_value = float(metric_data.strip().replace(',', '')) return (metric_name, metric_value) def CollectResultFile(vm, results): """Collect result file on vm. Args: vm: The target vm. results: A dictionary of lists. Each list contains results of a field defined in RESULTS_METRICS collected from each loader machines. """ result_path = _ResultFilePath(vm) vm.PullFile(vm_util.GetTempDir(), result_path) resp, _ = vm.RemoteCommand('tail -n 20 ' + result_path) metrics = ParseResp(resp) logging.info('Metrics: %s', metrics) for metric in RESULTS_METRICS: if metric not in metrics: raise ValueError(f'Metric {metric} not found in result file.') value = metrics[metric] results[metric].append(value) def CollectResults(loader_vms, metadata): """Collect and parse test results. Args: loader_vms: client vms. metadata: dict. Contains metadata for this benchmark. Returns: A list of sample.Sample objects. """ logging.info('Gathering results.') raw_results = collections.defaultdict(list) args = [((vm, raw_results), {}) for vm in loader_vms] background_tasks.RunThreaded(CollectResultFile, args) results = [] for metric in RESULTS_METRICS: if metric in MAXIMUM_METRICS: value = max(raw_results[metric]) else: value = math.fsum(raw_results[metric]) if metric not in AGGREGATED_METRICS: value = value / len(loader_vms) unit = '' if metric.startswith('latency'): unit = 'ms' elif metric.endswith('rate'): unit = 'operations per second' elif metric == 'Total operation time': unit = 'seconds' results.append(sample.Sample(metric, value, unit, metadata)) return results def GenerateCpuUtilizationFileName(vm): return f'{vm_util.VM_TMP_DIR}/{vm.name}-cpu_utilization.log' def Run(benchmark_spec): """Run Cassandra on target vms. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: A list of sample.Sample objects. """ cassandra_vms, client_vms = ParseVmGroups(benchmark_spec.vm_groups) metadata = GenerateMetadataFromFlags( benchmark_spec, cassandra_vms, client_vms ) metadata['cassandra_version'] = cassandra.GetCassandraVersion( benchmark_spec.vm_groups[CASSANDRA_GROUP][0] ) return RunTestNTimes(client_vms, cassandra_vms, metadata) def RunTestNTimes(client_vms, cassandra_vms, metadata): """Run the cassandra stress test max_allowed_runs times. Args: client_vms: client vms. cassandra_vms: cassandra server vms. metadata: dict. Contains metadata for this benchmark. Returns: A list of sample.Sample objects. Running cassandra stress test with different thread counts. - We increase the thread count gradually by THREAD_INCREMENT_COUNT till op rate increases. - We decrease the thread count by THREAD_INCREMENT_COUNT/2 if the operation rate is lower than the previous run. """ samples = [] last_operation_rate = 0 run_count = 0 thread_count = STARTING_THREAD_COUNT max_op_rate = 0 max_op_rate_metadata = None while run_count < CASSANDRA_STRESS_MAX_RUNS.value: logging.info('running thread count: %s', thread_count) run_count += 1 cpu_loads = RunCassandraStressTest( cassandra_vms, client_vms, metadata['num_keys'], metadata['command'], thread_count, metadata.get('operations'), metadata['population_size'], metadata['population_dist'], metadata['population_parameters'], is_preload=False, ) cpu_utilization = GetCpuUtilization(cassandra_vms) metadata['server_load_cpu_utilization'] = [ cpu_loads[i] / cassandra_vms[i].num_cpus * 100 for i in range(len(cassandra_vms)) ] metadata['server_cpu_utilization'] = cpu_utilization metadata['num_cassandra_stress_threads'] = thread_count # Trackig disk and memory usage for auditing and debugging purposes. for vm in cassandra_vms: vm.RemoteCommand('df -H') vm.RemoteCommand('free -h') current_samples = CollectResults(client_vms, copy.deepcopy(metadata)) samples.extend(current_samples) latest_operation_rate = GetOperationRate(current_samples) median_latency = GetMedianLatency(current_samples) max_cpu_usage = max(cpu_utilization) if max_op_rate < latest_operation_rate: max_op_rate = latest_operation_rate max_op_rate_metadata = metadata if ( int(median_latency) >= MAX_MEDIAN_LATENCY_MS or max_cpu_usage > CASSANDRA_CPU_UTILIZATION_LIMIT.value ): next_thread_count = thread_count - THREAD_INCREMENT_COUNT / 2 elif latest_operation_rate > last_operation_rate: next_thread_count = thread_count + THREAD_INCREMENT_COUNT else: next_thread_count = thread_count - THREAD_INCREMENT_COUNT / 2 last_operation_rate = latest_operation_rate thread_count = next_thread_count WaitForCompactionTasks(cassandra_vms) samples.append( sample.Sample( 'max_op_rate', max_op_rate, 'operations per second', max_op_rate_metadata, ) ) return samples @vm_util.Retry( max_retries=10, retryable_exceptions=( CassandraCompactionNotCompletedError, ), poll_interval=WAIT_BETWEEN_COMPACTION_TASKS_CHECK, ) def WaitForCompactionTasks(cassandra_vms): """Waits for cassandra's pending compaction tasks to be completed.""" pending_compaction_tasks = cassandra.GetPendingTaskCountFromCompactionStats( cassandra_vms ) max_pending_compaction_tasks = max(pending_compaction_tasks) logging.info( 'Remaining compaction tasks: %s', max_pending_compaction_tasks, ) if max_pending_compaction_tasks > 0: raise CassandraCompactionNotCompletedError( f'{max_pending_compaction_tasks} compaction tasks not completed' ) def GetCpuUtilization(cassandra_vms): """Get cpu utilization during the test from sar output. Args: cassandra_vms: cassandra server vms. Returns: A list of cpu utilization for each cassandra node. """ cpu_utilization = [] if ( CalculateNumberOfSarRequestsFromDuration( CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL ) == 0 ): return cpu_utilization for vm in cassandra_vms: stdout, _ = vm.RobustRemoteCommand( f'cat {GenerateCpuUtilizationFileName(vm)}' ) cpu_utilization.append(ParseAverageCpuUtilization(stdout)) return cpu_utilization def ParseUptimeOutput(uptime_output) -> float: """Parses the output of the uptime command. Args: uptime_output: The output of the uptime command. Returns: The load average of the last 5 minutes. Sample uptime command output: 20:11:37 up 172 days, 22 min, 4 users, load average: 0.23, 0.54, 0.31 load average displays the load average of the last 1 minute, 5 minutes and 15 minutes. """ load_average_str = re.sub('.*(load average: )(.*)', r'\2', uptime_output) return float(load_average_str.split(', ')[1]) def GetOperationRate(samples): for s in samples: if s.metric == 'op rate': return s.value return 0 def GetMedianLatency(samples): for s in samples: if s.metric == 'latency median': return s.value return 0 def Cleanup(benchmark_spec): """Cleanup function. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ vm_dict = benchmark_spec.vm_groups cassandra_vms = vm_dict[CASSANDRA_GROUP] background_tasks.RunThreaded(cassandra.Stop, cassandra_vms) background_tasks.RunThreaded(cassandra.CleanNode, cassandra_vms)