in perfkitbenchmarker/linux_benchmarks/aerospike_benchmark.py [0:0]
def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[sample.Sample]:
"""Runs a read/update load test on Aerospike.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
clients = benchmark_spec.vm_groups['clients']
num_client_vms = len(clients)
servers = benchmark_spec.vm_groups['workers']
samples = []
seed_ips = ','.join([str(vm.internal_ip) for vm in servers])
metadata = {}
for threads in range(
AEROSPIKE_MIN_CLIENT_THREADS.value,
AEROSPIKE_MAX_CLIENT_THREADS.value + 1,
AEROSPIKE_CLIENT_THREADS_STEP_SIZE.value,
):
stdout_samples = []
def _Run(namespace, client_idx, process_idx, op, extra_arg):
extra_arg_str = f'{extra_arg} ' if extra_arg else ''
run_command = (
f'asbench '
f'--threads {threads} --namespace {namespace} ' # pylint: disable=cell-var-from-loop
f'--workload "{op}" '
f'{extra_arg_str} '
f'--object-spec {AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value} '
f'--keys {AEROSPIKE_NUM_KEYS.value} '
f'--hosts {seed_ips} --port {3 + process_idx}000 '
f'--duration {AEROSPIKE_BENCHMARK_DURATION.value} '
'--latency --percentiles 50,90,99,99.9,99.99 '
'--output-file '
f'result.{client_idx}.{process_idx}.{threads} '
)
stdout, _ = clients[client_idx].RobustRemoteCommand(run_command)
stdout_samples.extend(aerospike_client.ParseAsbenchStdout(stdout)) # pylint: disable=cell-var-from-loop
workload_types = AEROSPIKE_TEST_WORKLOAD_TYPES.value.split(';')
extra_args = (
AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
if AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
else [None] * len(workload_types)
)
if len(extra_args) != len(workload_types):
raise ValueError(
'aerospike_test_workload_extra_args must be the same length as '
'aerospike_test_workload_types'
)
for op, extra_arg in zip(workload_types, extra_args):
for namespace in AEROSPIKE_NAMESPACES.value:
run_params = []
for client_idx in range(len(clients)):
for process_idx in range(FLAGS.aerospike_instances):
run_params.append(
((namespace, client_idx, process_idx, op, extra_arg), {})
)
background_tasks.RunThreaded(_Run, run_params)
for server in servers:
server.RemoteCommand('sudo asadm -e summary')
if num_client_vms * FLAGS.aerospike_instances == 1:
detailed_samples = stdout_samples
else:
detailed_samples = aerospike_client.AggregateAsbenchSamples(
stdout_samples
)
temp_samples = aerospike_client.CreateTimeSeriesSample(detailed_samples)
result_files = []
for client_idx in range(len(clients)):
for process_idx in range(FLAGS.aerospike_instances):
filename = f'result.{client_idx}.{process_idx}.{threads}'
clients[client_idx].PullFile(vm_util.GetTempDir(), filename)
result_files.append(filename)
if (
FLAGS.aerospike_publish_detailed_samples
or _PUBLISH_PERCENTILE_TIME_SERIES.value
):
detailed_samples.extend(
aerospike_client.ParseAsbenchHistogram(result_files)
)
temp_samples.extend(detailed_samples)
metadata.update({
'num_clients_vms': AEROSPIKE_CLIENT_VMS.value,
'num_aerospike_vms': len(servers),
'num_aerospike_instances': FLAGS.aerospike_instances,
'storage_type': FLAGS.aerospike_storage_type,
'memory_size': int(servers[0].total_memory_kb * 0.8),
'service_threads': FLAGS.aerospike_service_threads,
'replication_factor': FLAGS.aerospike_replication_factor,
'client_threads': threads,
'read_percent': AEROSPIKE_READ_PERCENT.value,
'aerospike_edition': FLAGS.aerospike_edition.value,
'aerospike_enable_strong_consistency': (
FLAGS.aerospike_enable_strong_consistency
),
'aerospike_test_workload_types': AEROSPIKE_TEST_WORKLOAD_TYPES.value,
'aerospike_test_workload_extra_args': (
AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
),
'aerospike_skip_db_prepopulation': (
AEROSPIKE_SKIP_DB_PREPOPULATION.value
),
'aerospike_test_workload_object_spec': (
AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value
),
})
if FLAGS.aerospike_edition == aerospike_server.AerospikeEdition.ENTERPRISE:
metadata.update({
'aerospike_version': FLAGS.aerospike_enterprise_version,
})
for s in temp_samples:
s.metadata.update(metadata)
samples.extend(temp_samples)
return samples