perfkitbenchmarker/linux_packages/cassandra.py (231 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Installs/Configures Cassandra.
See 'perfkitbenchmarker/data/cassandra/' for configuration files used.
Cassandra homepage: http://cassandra.apache.org
"""
import logging
import os
import posixpath
import re
import time
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import linux_packages
JNA_JAR_URL = (
'https://maven.java.net/content/repositories/releases/'
'net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar'
)
CASSANDRA_GIT_REPRO = 'https://github.com/apache/cassandra.git'
CASSANDRA_VERSION = 'cassandra-2.1'
CASSANDRA_YAML_TEMPLATE = 'cassandra/cassandra.yaml.j2'
CASSANDRA_RACKDC_TEMPLATE = 'cassandra/cassandra-rackdc.properties.j2'
CASSANDRA_KEYSPACE_TEMPLATE = (
'cassandra/create-keyspace-cassandra-stress.cql.j2'
)
CASSANDRA_ROW_CACHE_TEMPLATE = 'cassandra/enable-row-caching.cql.j2'
CASSANDRA_VERSION = 'apache-cassandra-4.1.5'
CASSANDRA_DIR = posixpath.join(linux_packages.INSTALL_DIR, CASSANDRA_VERSION)
CASSANDRA_PID = posixpath.join(CASSANDRA_DIR, 'cassandra.pid')
CASSANDRA_OUT = posixpath.join(CASSANDRA_DIR, 'cassandra.out')
CASSANDRA_ERR = posixpath.join(CASSANDRA_DIR, 'cassandra.err')
# Number of times to attempt to start the cluster.
CLUSTER_START_TRIES = 10
CLUSTER_START_SLEEP = 30
# Time, in seconds, to sleep between node starts.
NODE_START_SLEEP = 30
# for setting a maven repo with --cassandra_maven_repo_url
_MAVEN_REPO_PARAMS = """
artifact.remoteRepository.central: {0}
artifact.remoteRepository.apache: {0}
"""
FLAGS = flags.FLAGS
flags.DEFINE_integer('cassandra_replication_factor', 3, 'Num of replicas.')
CASSANDRA_CONCURRENT_READS = flags.DEFINE_integer(
'cassandra_concurrent_reads',
32,
'Concurrent read requests each server accepts.',
)
CASSANDRA_CONCURRENT_WRITES = flags.DEFINE_integer(
'cassandra_concurrent_writes',
None,
'Concurrent write requests each server accepts. Suggested number is'
' Number of CPUs in the VM * 8',
)
# Partial list of known mirrors:
# https://repo.maven.apache.org/maven2/.meta/repository-metadata.xml
# See instructions for setting up own mirror:
# https://maven.apache.org/guides/mini/guide-mirror-settings.html
flags.DEFINE_boolean(
'cassandra_maven_repo_url', None, 'Optional maven repo mirror to use.'
)
def CheckPrerequisites():
"""Verifies that the required resources are present.
Raises:
perfkitbenchmarker.data.ResourceNotFound: On missing resource.
"""
for resource in (CASSANDRA_YAML_TEMPLATE, CASSANDRA_RACKDC_TEMPLATE):
data.ResourcePath(resource)
def _Install(vm):
"""Installs Cassandra as a debian package.
Args:
vm: VirtualMachine. The VM to install Cassandra on.
"""
vm.Install('openjdk')
vm.Install('curl')
vm.RemoteCommand(
'curl -o /opt/pkb/cassandra.tar.gz'
' https://archive.apache.org/dist/cassandra/4.1.5/apache-cassandra-4.1.5-bin.tar.gz'
)
vm.RemoteCommand('tar xzvf /opt/pkb/cassandra.tar.gz --directory /opt/pkb')
def GetCassandraVersion(vm) -> str:
"""Returns the Cassandra version installed on the VM."""
stdout, _ = vm.RemoteCommand(f'{GetCassandraPath()} -v')
return stdout
def YumInstall(vm):
"""Installs Cassandra on the VM."""
_Install(vm)
def AptInstall(vm):
"""Installs Cassandra on the VM."""
_Install(vm)
def JujuInstall(vm, vm_group_name):
"""Installs the Cassandra charm on the VM."""
vm.JujuDeploy('cs:trusty/cassandra', vm_group_name)
# The charm defaults to Cassandra 2.2.x, which has deprecated
# cassandra-cli. Specify the sources to downgrade to Cassandra 2.1.x
# to match the cassandra benchmark(s) expectations.
sources = [
'deb https://www.apache.org/dist/cassandra/debian 21x main',
'ppa:openjdk-r/ppa',
'ppa:stub/cassandra',
]
keys = ['F758CE318D77295D', 'null', 'null']
vm.JujuSet(
'cassandra',
[
# Allow authentication from all units
'authenticator=AllowAllAuthenticator',
'install_sources="[%s]"'
% ', '.join(["'" + x + "'" for x in sources]),
'install_keys="[%s]"' % ', '.join(keys),
],
)
# Wait for cassandra to be installed and configured
vm.JujuWait()
for unit in vm.units:
# Make sure the cassandra/conf dir is created, since we're skipping
# the manual installation to /opt/pkb.
remote_path = posixpath.join(CASSANDRA_DIR, 'conf')
unit.RemoteCommand('mkdir -p %s' % remote_path)
def Configure(vm, seed_vms):
"""Configure Cassandra on 'vm'.
Args:
vm: VirtualMachine. The VM to configure.
seed_vms: List of VirtualMachine. The seed virtual machine(s).
"""
context = {
'ip_address': vm.internal_ip,
'data_path': posixpath.join(vm.GetScratchDir(), 'cassandra'),
'seeds': ','.join(f'{vm.internal_ip}:7000' for vm in seed_vms),
'num_cpus': vm.NumCpusForBenchmark(),
'cluster_name': 'Test cluster',
'concurrent_reads': CASSANDRA_CONCURRENT_READS.value,
'concurrent_writes': (
CASSANDRA_CONCURRENT_WRITES.value
if CASSANDRA_CONCURRENT_WRITES.value
else 8 * vm.NumCpusForBenchmark()
),
'datacenter': 'datacenter',
'rack': vm.zone,
'row_cache_size': (
f'{FLAGS.row_cache_size}MiB' if FLAGS.is_row_cache_enabled else '0MiB'
),
}
logging.info('cassandra yaml context: %s', context)
for template in [CASSANDRA_YAML_TEMPLATE, CASSANDRA_RACKDC_TEMPLATE]:
local_path = data.ResourcePath(template)
cassandra_conf_path = posixpath.join(CASSANDRA_DIR, 'conf')
remote_path = posixpath.join(
'~', os.path.splitext(os.path.basename(template))[0]
)
vm.RenderTemplate(local_path, remote_path, context=context)
vm.RemoteCommand(f'sudo cp {remote_path} {cassandra_conf_path}')
vm.RemoteCommand(f'mkdir {vm.GetScratchDir()}/cassandra')
vm.RemoteCommand(f'sudo chmod -R 755 {vm.GetScratchDir()}/cassandra')
def Start(vm):
"""Start Cassandra on a VM.
Args:
vm: The target vm. Should already be configured via 'Configure'.
"""
vm.RemoteCommand(f'{GetCassandraPath()}')
def CreateKeyspace(vm, replication_factor):
"""Create a keyspace on a VM."""
RunCql(vm, CASSANDRA_KEYSPACE_TEMPLATE, replication_factor)
if FLAGS.is_row_cache_enabled:
RunCql(vm, CASSANDRA_ROW_CACHE_TEMPLATE, replication_factor)
def RunCql(vm, template, replication_factor):
"""Run a CQL file on a VM."""
cassandra_conf_path = posixpath.join(CASSANDRA_DIR, 'conf')
template_path = data.ResourcePath(template)
file_name = os.path.basename(os.path.splitext(template_path)[0])
remote_path = os.path.join(
'~',
file_name,
)
vm.RenderTemplate(
template_path,
remote_path,
context={
'keyspace': 'keyspace1',
'replication_factor': replication_factor,
},
)
vm.RemoteCommand(f'sudo cp {remote_path} {cassandra_conf_path}')
vm.RemoteCommand(
f'cd {CASSANDRA_DIR}/bin && sudo ./cqlsh -f'
f' {cassandra_conf_path}/{file_name}'
)
def Stop(vm):
"""Stops Cassandra on 'vm'."""
vm.RemoteCommand('kill $(cat {})'.format(CASSANDRA_PID), ignore_failure=True)
def IsRunning(vm):
"""Returns a boolean indicating whether Cassandra is running on 'vm'."""
try:
_, stderr = vm.RemoteCommand(f'{GetNodetoolPath()} status')
if stderr:
return False
return True
except errors.VirtualMachine.RemoteCommandError as ex:
logging.warning('Exception: %s', ex)
return False
def GetCompactionStats(vm):
"""Returns compaction stats for the given VM.
Args:
vm: VirtualMachine. The VM to get compaction stats from.
Sample Output of compaction stats:
pending tasks: 5
- keyspace1.standard1: 5
id compaction type keyspace table completed total unit progress
c69 Compaction keyspace1 standard1 437886277 20868111340 bytes 2.10%
Active compaction remaining time : 0h05m33s
"""
stdout, _ = vm.RemoteCommand(f'{GetNodetoolPath()} compactionstats')
return stdout
def GetPendingTaskCountFromCompactionStats(cassandra_vms):
"""Parses the compaction stats for the given VMs and returns the pending task count.
Args:
cassandra_vms: List of VirtualMachine. The Cassandra VMs to get compaction
stats from.
Returns:
List of int. The pending task count for each VM.
"""
compaction_stats = background_tasks.RunThreaded(
GetCompactionStats, cassandra_vms
)
pending_tasks = []
for stats in compaction_stats:
line = re.search(r'pending tasks: *(\d*)', stats)
if line:
value = re.sub(r'pending tasks: *(\d*)', r'\1', line.group())
pending_tasks.append(int(value))
return pending_tasks
def CleanNode(vm):
"""Remove Cassandra data from 'vm'.
Args:
vm: VirtualMachine. VM to clean.
"""
data_path = posixpath.join(vm.GetScratchDir(), 'cassandra')
vm.RemoteCommand('rm -rf {}'.format(data_path))
def _StartCassandraIfNotRunning(vm):
"""Starts Cassandra on 'vm' if not currently running."""
if not IsRunning(vm):
logging.info('Retrying starting cassandra on %s', vm)
Start(vm)
def GetCassandraCliPath(_):
return posixpath.join(CASSANDRA_DIR, 'bin', 'cassandra-cli')
def GetCassandraPath():
return posixpath.join(CASSANDRA_DIR, 'bin', 'cassandra')
def GetNodetoolPath():
return posixpath.join(CASSANDRA_DIR, 'bin', 'nodetool')
def GetCassandraStressPath(_):
return posixpath.join(CASSANDRA_DIR, 'tools', 'bin', 'cassandra-stress')
def GetNumberOfNodesUp(vm):
"""Gets the number of VMs which are up in a Cassandra cluster.
Args:
vm: VirtualMachine. The VM to use to check the cluster status.
Returns:
int. The number of VMs which are up in a Cassandra cluster.
"""
vms_up = vm.RemoteCommand(f'{GetNodetoolPath()} status | grep -c "^UN"')[
0
].strip()
return int(vms_up)
def StartCluster(seed_vm, vms):
"""Starts a Cassandra cluster.
Starts a Cassandra cluster, first starting 'seed_vm', then remaining VMs in
'vms'.
Args:
seed_vm: VirtualMachine. Machine which will function as the sole seed. It
will be started before all other VMs.
vms: list of VirtualMachines. VMs *other than* seed_vm which should be
started.
Raises:
OSError: if cluster startup fails.
"""
vm_count = len(vms) + 1
# Cassandra setup
logging.info('Starting seed VM %s', seed_vm)
Start(seed_vm)
logging.info('Waiting %ds for seed to start', NODE_START_SLEEP)
time.sleep(NODE_START_SLEEP)
for i in range(5):
if not IsRunning(seed_vm):
logging.warn(
'Seed %s: Cassandra not running yet (try %d). Waiting %ds.',
seed_vm,
i,
NODE_START_SLEEP,
)
time.sleep(NODE_START_SLEEP)
else:
break
else:
raise ValueError('Cassandra failed to start on seed.')
if vms:
logging.info('Starting remaining %d nodes', len(vms))
# Start the VMs with a small pause in between each, to allow the node to
# join.
# Starting Cassandra nodes fails when multiple nodes attempt to join the
# cluster concurrently.
for i, vm in enumerate(vms):
logging.info('Starting non-seed VM %d/%d.', i + 1, len(vms))
Start(vm)
time.sleep(NODE_START_SLEEP)
logging.info('Waiting %ds for nodes to join', CLUSTER_START_SLEEP)
time.sleep(CLUSTER_START_SLEEP)
for i in range(CLUSTER_START_TRIES):
vms_up = GetNumberOfNodesUp(seed_vm)
if vms_up == vm_count:
logging.info('All %d nodes up!', vm_count)
break
logging.warn(
'Try %d: only %s of %s up. Restarting and sleeping %ds',
i,
vms_up,
vm_count,
NODE_START_SLEEP,
)
background_tasks.RunThreaded(_StartCassandraIfNotRunning, vms)
time.sleep(NODE_START_SLEEP)
else:
raise OSError('Failed to start Cassandra cluster.')