perfkitbenchmarker/linux_benchmarks/aerospike_benchmark.py (355 lines of code) (raw):
# Copyright 2022 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs Aerospike (http://www.aerospike.com).
Aerospike is an opensource NoSQL solution. This benchmark runs a read/update
load test with varying numbers of client threads against an Aerospike server.
This test can be run in a variety of configurations including memory only,
remote/persistent ssd, and local ssd. The Aerospike configuration is controlled
by the "aerospike_storage_type" and "data_disk_type" flags.
"""
import functools
from typing import Any, Dict, List
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import benchmark_spec as bm_spec
from perfkitbenchmarker import configs
from perfkitbenchmarker import disk
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import aerospike_client
from perfkitbenchmarker.linux_packages import aerospike_server
FLAGS = flags.FLAGS
_DEFAULT_NAMESPACES = ['test']
AEROSPIKE_CLIENT_VMS = flags.DEFINE_integer(
'aerospike_client_vms',
1,
'Number of client machines to use for running asbench.',
)
AEROSPIKE_CLIENT_THREADS_FOR_LOAD_PHASE = flags.DEFINE_integer(
'aerospike_client_threads_for_load_phase',
8,
'The minimum number of client threads to use during the load phase.',
lower_bound=1,
)
AEROSPIKE_MIN_CLIENT_THREADS = flags.DEFINE_integer(
'aerospike_min_client_threads',
8,
'The minimum number of Aerospike client threads per vm.',
lower_bound=1,
)
AEROSPIKE_MAX_CLIENT_THREADS = flags.DEFINE_integer(
'aerospike_max_client_threads',
128,
'The maximum number of Aerospike client threads per vm.',
lower_bound=1,
)
AEROSPIKE_CLIENT_THREADS_STEP_SIZE = flags.DEFINE_integer(
'aerospike_client_threads_step_size',
8,
'The number to increase the Aerospike client threads '
'per vm by for each iteration of the test.',
lower_bound=1,
)
AEROSPIKE_READ_PERCENT = flags.DEFINE_integer(
'aerospike_read_percent',
90,
'The percent of operations which are reads. This is on the path of'
' DEPRECATION. Do not use.',
lower_bound=0,
upper_bound=100,
)
AEROSPIKE_NUM_KEYS = flags.DEFINE_integer(
'aerospike_num_keys',
1000000,
'The number of keys to load Aerospike with. The index '
'must fit in memory regardless of where the actual '
'data is being stored and each entry in the '
'index requires 64 bytes.',
)
AEROSPIKE_BENCHMARK_DURATION = flags.DEFINE_integer(
'aerospike_benchmark_duration',
60,
'Duration of each test iteration in secs.',
)
flags.DEFINE_boolean(
'aerospike_publish_detailed_samples',
False,
'Whether or not to publish one sample per aggregation'
'window with histogram. By default, only TimeSeries '
'sample will be generated.',
)
flags.DEFINE_integer(
'aerospike_instances',
1,
'Number of aerospike_server processes to run. '
'e.g. if this is set to 2, we will launch 2 aerospike '
'processes on the same VM. Flags such as '
'aerospike_num_keys and client threads will be applied '
'to each instance.',
)
AEROSPIKE_CLIENT_MACHINE_TYPE = flags.DEFINE_string(
'aerospike_client_machine_type',
None,
'Machine type to use for the aerospike client if different '
'from aerospike server machine type.',
)
AEROSPIKE_NAMESPACES = flags.DEFINE_list(
'aerospike_namespaces',
_DEFAULT_NAMESPACES,
'The Aerospike namespaces to test against',
)
flags.DEFINE_boolean(
'aerospike_enable_strong_consistency',
False,
'Whether or not to enable strong consistency for the Aerospike namespaces.',
)
AEROSPIKE_TEST_WORKLOAD_TYPES = flags.DEFINE_string(
'aerospike_test_workload_types',
'RU, 75',
'The test workload types to generate. If there are multuple types, they'
' should be separated by semicolon.',
)
AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS = flags.DEFINE_list(
'aerospike_test_workload_extra_args',
None,
'The extra args to use in asbench commands.',
)
AEROSPIKE_SKIP_DB_PREPOPULATION = flags.DEFINE_boolean(
'aerospike_skip_db_prepopulation',
False,
'Whether or not to skip pre-populating the Aerospike DB.',
)
AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC = flags.DEFINE_string(
'aerospike_test_workload_object_spec',
'B1000',
'The object spec to generate for the test workload.',
)
_PUBLISH_PERCENTILE_TIME_SERIES = flags.DEFINE_boolean(
'aerospike_publish_percentile_time_series',
True,
(
'Whether or not to publish one sample per aggregation'
'window with percentiles to capture'
),
)
_PERCENTILES_TO_CAPTURE = flags.DEFINE_list(
'aerospike_percentiles_to_capture',
['50', '90', '99', '99.9', '99.99'],
(
'List of percentiles to capture if'
' aerospike_publish_percentile_time_series is set.'
),
)
AEROSPIKE_SERVER_MACHINE_TYPE = flags.DEFINE_string(
'aerospike_server_machine_type',
None,
(
'Machine type to use for the aerospike server if different '
'from aerospike client machine type.'
),
)
BENCHMARK_NAME = 'aerospike'
BENCHMARK_CONFIG = """
aerospike:
description: Runs Aerospike.
vm_groups:
workers:
vm_spec: *default_dual_core
disk_spec: *default_500_gb
vm_count: null
disk_count: 0
clients:
vm_spec: *default_dual_core
"""
def GetConfig(user_config: Dict[str, Any]) -> Dict[str, Any]:
"""Gets the Aerospike config."""
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
if FLAGS.aerospike_storage_type == aerospike_server.DISK:
config['vm_groups']['workers']['disk_count'] = 1
if FLAGS.data_disk_type == disk.LOCAL:
# Didn't know max number of local disks, decide later.
if FLAGS.cloud == 'GCP':
config['vm_groups']['workers']['vm_spec']['GCP']['num_local_ssds'] = (
FLAGS.gce_num_local_ssds or FLAGS.server_gce_num_local_ssds
)
FLAGS['gce_num_local_ssds'].present = False
FLAGS.gce_num_local_ssds = 0
if FLAGS['server_gce_ssd_interface'].present:
config['vm_groups']['workers']['vm_spec']['GCP'][
'ssd_interface'
] = FLAGS.server_gce_ssd_interface
FLAGS['gce_ssd_interface'].present = False
FLAGS.gce_ssd_interface = FLAGS.server_gce_ssd_interface
config['vm_groups']['clients']['vm_spec']['GCP']['num_local_ssds'] = 0
if AEROSPIKE_SERVER_MACHINE_TYPE.value:
vm_spec = config['vm_groups']['workers']['vm_spec']
for cloud in vm_spec:
vm_spec[cloud]['machine_type'] = AEROSPIKE_SERVER_MACHINE_TYPE.value
if AEROSPIKE_CLIENT_MACHINE_TYPE.value:
vm_spec = config['vm_groups']['clients']['vm_spec']
for cloud in vm_spec:
vm_spec[cloud]['machine_type'] = AEROSPIKE_CLIENT_MACHINE_TYPE.value
if FLAGS['aerospike_vms'].present:
config['vm_groups']['workers']['vm_count'] = FLAGS.aerospike_vms
if FLAGS['aerospike_client_vms'].present:
config['vm_groups']['clients']['vm_count'] = AEROSPIKE_CLIENT_VMS.value
if FLAGS.aerospike_instances > 1 and FLAGS.aerospike_vms > 1:
raise errors.Setup.InvalidFlagConfigurationError(
'Only one of aerospike_instances and aerospike_vms can be set.'
)
return config
def Prepare(benchmark_spec: bm_spec.BenchmarkSpec) -> None:
"""Install Aerospike server and Aerospike tools on the other.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
clients = benchmark_spec.vm_groups['clients']
num_client_vms = len(clients)
servers = benchmark_spec.vm_groups['workers']
# VMs where the server is not up yet.
servers_not_up = [
server for server in servers if not aerospike_server.IsServerUp(server)
]
seed_ips = [str(vm.internal_ip) for vm in servers]
aerospike_install_fns = []
if servers_not_up:
# Prepare the VMs where the server isn't up yet.
aerospike_install_fns = [
functools.partial(
aerospike_server.ConfigureAndStart,
vm,
seed_node_ips=seed_ips,
)
for vm in servers
]
if FLAGS.aerospike_enable_strong_consistency:
for server in servers:
aerospike_server.EnableStrongConsistency(
server, AEROSPIKE_NAMESPACES.value
)
client_install_fns = [
functools.partial(vm.Install, 'aerospike_client') for vm in clients
]
background_tasks.RunThreaded(
lambda f: f(), aerospike_install_fns + client_install_fns
)
loader_counts = [
int(AEROSPIKE_NUM_KEYS.value) // len(clients)
+ (1 if i < (AEROSPIKE_NUM_KEYS.value % num_client_vms) else 0)
for i in range(num_client_vms)
]
if AEROSPIKE_SKIP_DB_PREPOPULATION.value:
return
extra_args = FLAGS.aerospike_test_workload_extra_args or ['']
@vm_util.Retry(max_retries=3) # Retry if the server is not fully up yet.
def _Load(namespace, client_idx, process_idx):
ips = ','.join(seed_ips)
# Default to use the first extra arg.
extra_arg_str = f'{extra_args[0]}' if extra_args[0] else ''
load_command = (
'asbench '
f'--threads {AEROSPIKE_CLIENT_THREADS_FOR_LOAD_PHASE.value} '
f'--namespace {namespace} --workload I '
f'--object-spec {AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value} '
f'--keys {loader_counts[client_idx]} '
f'--start-key {sum(loader_counts[:client_idx])} '
f' -h {ips} -p {3 + process_idx}000 '
f'{extra_arg_str}'
)
clients[client_idx].RobustRemoteCommand(load_command)
run_params = []
for namespace in AEROSPIKE_NAMESPACES.value:
for child_idx in range(len(clients)):
for process_idx in range(FLAGS.aerospike_instances):
run_params.append(((namespace, child_idx, process_idx), {}))
background_tasks.RunThreaded(_Load, run_params)
def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[sample.Sample]:
"""Runs a read/update load test on Aerospike.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
clients = benchmark_spec.vm_groups['clients']
num_client_vms = len(clients)
servers = benchmark_spec.vm_groups['workers']
samples = []
seed_ips = ','.join([str(vm.internal_ip) for vm in servers])
metadata = {}
for threads in range(
AEROSPIKE_MIN_CLIENT_THREADS.value,
AEROSPIKE_MAX_CLIENT_THREADS.value + 1,
AEROSPIKE_CLIENT_THREADS_STEP_SIZE.value,
):
stdout_samples = []
def _Run(namespace, client_idx, process_idx, op, extra_arg):
extra_arg_str = f'{extra_arg} ' if extra_arg else ''
run_command = (
f'asbench '
f'--threads {threads} --namespace {namespace} ' # pylint: disable=cell-var-from-loop
f'--workload "{op}" '
f'{extra_arg_str} '
f'--object-spec {AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value} '
f'--keys {AEROSPIKE_NUM_KEYS.value} '
f'--hosts {seed_ips} --port {3 + process_idx}000 '
f'--duration {AEROSPIKE_BENCHMARK_DURATION.value} '
'--latency --percentiles 50,90,99,99.9,99.99 '
'--output-file '
f'result.{client_idx}.{process_idx}.{threads} '
)
stdout, _ = clients[client_idx].RobustRemoteCommand(run_command)
stdout_samples.extend(aerospike_client.ParseAsbenchStdout(stdout)) # pylint: disable=cell-var-from-loop
workload_types = AEROSPIKE_TEST_WORKLOAD_TYPES.value.split(';')
extra_args = (
AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
if AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
else [None] * len(workload_types)
)
if len(extra_args) != len(workload_types):
raise ValueError(
'aerospike_test_workload_extra_args must be the same length as '
'aerospike_test_workload_types'
)
for op, extra_arg in zip(workload_types, extra_args):
for namespace in AEROSPIKE_NAMESPACES.value:
run_params = []
for client_idx in range(len(clients)):
for process_idx in range(FLAGS.aerospike_instances):
run_params.append(
((namespace, client_idx, process_idx, op, extra_arg), {})
)
background_tasks.RunThreaded(_Run, run_params)
for server in servers:
server.RemoteCommand('sudo asadm -e summary')
if num_client_vms * FLAGS.aerospike_instances == 1:
detailed_samples = stdout_samples
else:
detailed_samples = aerospike_client.AggregateAsbenchSamples(
stdout_samples
)
temp_samples = aerospike_client.CreateTimeSeriesSample(detailed_samples)
result_files = []
for client_idx in range(len(clients)):
for process_idx in range(FLAGS.aerospike_instances):
filename = f'result.{client_idx}.{process_idx}.{threads}'
clients[client_idx].PullFile(vm_util.GetTempDir(), filename)
result_files.append(filename)
if (
FLAGS.aerospike_publish_detailed_samples
or _PUBLISH_PERCENTILE_TIME_SERIES.value
):
detailed_samples.extend(
aerospike_client.ParseAsbenchHistogram(result_files)
)
temp_samples.extend(detailed_samples)
metadata.update({
'num_clients_vms': AEROSPIKE_CLIENT_VMS.value,
'num_aerospike_vms': len(servers),
'num_aerospike_instances': FLAGS.aerospike_instances,
'storage_type': FLAGS.aerospike_storage_type,
'memory_size': int(servers[0].total_memory_kb * 0.8),
'service_threads': FLAGS.aerospike_service_threads,
'replication_factor': FLAGS.aerospike_replication_factor,
'client_threads': threads,
'read_percent': AEROSPIKE_READ_PERCENT.value,
'aerospike_edition': FLAGS.aerospike_edition.value,
'aerospike_enable_strong_consistency': (
FLAGS.aerospike_enable_strong_consistency
),
'aerospike_test_workload_types': AEROSPIKE_TEST_WORKLOAD_TYPES.value,
'aerospike_test_workload_extra_args': (
AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
),
'aerospike_skip_db_prepopulation': (
AEROSPIKE_SKIP_DB_PREPOPULATION.value
),
'aerospike_test_workload_object_spec': (
AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value
),
})
if FLAGS.aerospike_edition == aerospike_server.AerospikeEdition.ENTERPRISE:
metadata.update({
'aerospike_version': FLAGS.aerospike_enterprise_version,
})
for s in temp_samples:
s.metadata.update(metadata)
samples.extend(temp_samples)
return samples
def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None:
"""Cleanup Aerospike.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
servers = benchmark_spec.vm_groups['workers']
clients = benchmark_spec.vm_groups['client']
def StopClient(client):
client.RemoteCommand('sudo rm -rf aerospike*')
background_tasks.RunThreaded(StopClient, clients)
def StopServer(server):
server.RemoteCommand(
'cd %s && nohup sudo make stop' % aerospike_server.AEROSPIKE_DIR
)
server.RemoteCommand('sudo rm -rf aerospike*')
background_tasks.RunThreaded(StopServer, servers)