perfkitbenchmarker/linux_benchmarks/cassandra_ycsb_benchmark.py (131 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs YCSB against Cassandra.
This benchmark runs two workloads against Cassandra using YCSB (the Yahoo! Cloud
Serving Benchmark).
Cassandra described in perfkitbenchmarker.linux_packages.cassandra
YCSB and workloads described in perfkitbenchmarker.linux_packages.ycsb.
"""
import functools
import logging
import os
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import data
from perfkitbenchmarker.linux_packages import cassandra
from perfkitbenchmarker.linux_packages import ycsb
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'cassandra_ycsb'
BENCHMARK_CONFIG = """
cassandra_ycsb:
description: >
Run YCSB against Cassandra. Specify the
Cassandra cluster size with --num_vms. Specify the number
of YCSB VMs with --ycsb_client_vms.
vm_groups:
workers:
vm_spec: *default_dual_core
disk_spec: *default_500_gb
clients:
os_type: ubuntu2204 # Python 2
vm_spec: *default_dual_core
"""
# TODO: Add flags.
REPLICATION_FACTOR = 3
WRITE_CONSISTENCY = 'QUORUM'
READ_CONSISTENCY = 'QUORUM'
KEYSPACE_NAME = 'usertable'
COLUMN_FAMILY = 'data'
CREATE_TABLE_SCRIPT = 'cassandra/create-ycsb-table.cql.j2'
def GetConfig(user_config):
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
config['vm_groups']['workers']['vm_count'] = (
FLAGS.num_vms if FLAGS['num_vms'].present else 3
)
if FLAGS['ycsb_client_vms'].present:
config['vm_groups']['clients']['vm_count'] = FLAGS.ycsb_client_vms
return config
def CheckPrerequisites(benchmark_config):
"""Verifies that the required resources are present.
Raises:
perfkitbenchmarker.data.ResourceNotFound: On missing resource.
"""
cassandra.CheckPrerequisites()
ycsb.CheckPrerequisites()
data.ResourcePath(CREATE_TABLE_SCRIPT)
def _InstallCassandra(vm, seed_vms):
"""Install and start Cassandra on 'vm'."""
vm.Install('cassandra')
cassandra.Configure(vm, seed_vms=seed_vms)
def _CreateYCSBTable(
vm,
keyspace=KEYSPACE_NAME,
column_family=COLUMN_FAMILY,
replication_factor=REPLICATION_FACTOR,
):
"""Creates a Cassandra table for use with YCSB."""
template_path = data.ResourcePath(CREATE_TABLE_SCRIPT)
remote_path = os.path.join(
cassandra.CASSANDRA_DIR,
os.path.basename(os.path.splitext(template_path)[0]),
)
vm.RenderTemplate(
template_path,
remote_path,
context={
'keyspace': keyspace,
'column_family': column_family,
'replication_factor': replication_factor,
},
)
cassandra_cli = cassandra.GetCassandraCliPath(vm)
command = '{} -f {} -h {}'.format(
cassandra_cli, remote_path, vm.internal_ip
)
vm.RemoteCommand(command)
def _GetVMsByRole(benchmark_spec):
"""Gets a dictionary mapping role to a list of VMs."""
cassandra_vms = benchmark_spec.vm_groups['workers']
if FLAGS.ycsb_client_vms:
clients = benchmark_spec.vm_groups['clients']
else:
clients = cassandra_vms
return {
'vms': benchmark_spec.vms,
'cassandra_vms': cassandra_vms,
'seed_vm': cassandra_vms[0],
'non_seed_cassandra_vms': cassandra_vms[1:],
'clients': clients,
}
def Prepare(benchmark_spec):
"""Prepare the virtual machines to run YCSB against Cassandra.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
vms = benchmark_spec.vms
by_role = _GetVMsByRole(benchmark_spec)
loaders = by_role['clients']
assert loaders, vms
# Cassandra cluster
cassandra_vms = by_role['cassandra_vms']
assert cassandra_vms, 'No Cassandra VMs: {}'.format(by_role)
seed_vm = by_role['seed_vm']
assert seed_vm, 'No seed VM: {}'.format(by_role)
cassandra_install_fns = [
functools.partial(_InstallCassandra, vm, seed_vms=[seed_vm])
for vm in cassandra_vms
]
ycsb_install_fns = [functools.partial(vm.Install, 'ycsb') for vm in loaders]
if FLAGS.ycsb_client_vms:
background_tasks.RunThreaded(
lambda f: f(), cassandra_install_fns + ycsb_install_fns
)
else:
# If putting server and client on same vm, prepare packages one by one to
# avoid race condition.
background_tasks.RunThreaded(lambda f: f(), cassandra_install_fns)
background_tasks.RunThreaded(lambda f: f(), ycsb_install_fns)
cassandra.StartCluster(seed_vm, by_role['non_seed_cassandra_vms'])
_CreateYCSBTable(
seed_vm, replication_factor=FLAGS.cassandra_replication_factor
)
benchmark_spec.executor = ycsb.YCSBExecutor(
'cassandra2-cql', hosts=','.join(vm.internal_ip for vm in cassandra_vms)
)
def Run(benchmark_spec):
"""Spawn YCSB and gather the results.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample instances.
"""
loaders = _GetVMsByRole(benchmark_spec)['clients']
cassandra_vms = _GetVMsByRole(benchmark_spec)['cassandra_vms']
logging.debug('Loaders: %s', loaders)
kwargs = {
'hosts': ','.join(vm.internal_ip for vm in cassandra_vms),
'columnfamily': COLUMN_FAMILY,
'cassandra.readconsistencylevel': READ_CONSISTENCY,
'cassandra.scanconsistencylevel': READ_CONSISTENCY,
'cassandra.writeconsistencylevel': WRITE_CONSISTENCY,
'cassandra.deleteconsistencylevel': WRITE_CONSISTENCY,
}
metadata = {
'ycsb_client_vms': FLAGS.ycsb_client_vms,
'num_vms': len(cassandra_vms),
'concurrent_reads': FLAGS.cassandra_concurrent_reads,
'replication_factor': FLAGS.cassandra_replication_factor,
}
if not FLAGS.ycsb_client_vms:
metadata['ycsb_client_on_server'] = True
samples = list(
benchmark_spec.executor.LoadAndRun(
loaders, load_kwargs=kwargs, run_kwargs=kwargs
)
)
for sample in samples:
sample.metadata.update(metadata)
return samples
def Cleanup(benchmark_spec):
"""Cleanup.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
cassandra_vms = _GetVMsByRole(benchmark_spec)['cassandra_vms']
background_tasks.RunThreaded(cassandra.Stop, cassandra_vms)
background_tasks.RunThreaded(cassandra.CleanNode, cassandra_vms)