ccmlib/cluster.py (676 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ccm clusters
from __future__ import absolute_import
import itertools
import os
import random
import re
import shutil
import signal
import subprocess
import threading
import time
from collections import OrderedDict, defaultdict, namedtuple
from distutils.version import LooseVersion #pylint: disable=import-error, no-name-in-module
import yaml
from six import print_
from ccmlib import common, extension, repository
from ccmlib.node import Node, NodeError, TimeoutError
from six.moves import xrange
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
DEFAULT_CLUSTER_WAIT_TIMEOUT_IN_SECS = int(os.environ.get('CCM_CLUSTER_START_DEFAULT_TIMEOUT', 120))
class Cluster(object):
@staticmethod
def getConfDir(install_dir):
return os.path.join(install_dir, common.CASSANDRA_CONF_DIR)
@staticmethod
def getNodeClass():
return Node
def __init__(self, path, name, partitioner=None, install_dir=None, create_directory=True, version=None, verbose=False, derived_cassandra_version=None, options=None, **kwargs):
self.name = name
self.nodes = {}
self.seeds = []
self.partitioner = partitioner
self._config_options = {}
self._dse_config_options = {}
self._misc_config_options = {}
self._environment_variables = {}
self.__log_level = "INFO"
self.__path = path
self.__version = None
self.use_vnodes = False
self.configuration_yaml = options.configuration_yaml if options else None
# Classes that are to follow the respective logging level
self._debug = []
self._trace = []
self.data_dir_count = 1
if self.name.lower() == "current":
raise RuntimeError("Cannot name a cluster 'current'.")
# This is incredibly important for
# backwards compatibility.
if 'cassandra_version' in kwargs:
version = kwargs['cassandra_version']
if 'cassandra_dir' in kwargs:
install_dir = kwargs['cassandra_dir']
if create_directory:
# we create the dir before potentially downloading to throw an error sooner if need be
os.mkdir(self.get_path())
try:
if version is None:
# at this point, install_dir should always not be None, but
# we keep this for backward compatibility (in loading old cluster)
if install_dir is not None:
if common.is_win():
self.__install_dir = install_dir
else:
self.__install_dir = os.path.abspath(install_dir)
else:
repo_dir, v = self.load_from_repository(version, verbose)
self.__install_dir = repo_dir
self.__version = v
if self.__version is None:
if derived_cassandra_version is not None:
self.__version = derived_cassandra_version
else:
self.__version = self.__get_version_from_build()
if create_directory:
common.validate_install_dir(self.__install_dir)
self._update_config()
except:
if create_directory:
common.rmdirs(self.get_path())
raise
def load_from_repository(self, version, verbose):
return repository.setup(version, verbose)
def set_partitioner(self, partitioner):
self.partitioner = partitioner
self._update_config()
return self
def set_datadir_count(self, n):
self.data_dir_count = int(n)
self._update_config()
return self
def set_install_dir(self, install_dir=None, version=None, verbose=False):
if version is None:
self.__install_dir = install_dir
common.validate_install_dir(install_dir)
self.__version = self.__get_version_from_build()
else:
dir, v = repository.setup(version, verbose)
self.__install_dir = dir
self.__version = v if v is not None else self.__get_version_from_build()
if not isinstance(self.__version, LooseVersion):
self.__version = LooseVersion(self.__version)
self._update_config()
for node in list(self.nodes.values()):
node._cassandra_version = self.__version
node.import_config_files()
# if any nodes have a data center, let's update the topology
if any([node.data_center for node in self.nodes.values()]):
self.__update_topology_files()
if self.cassandra_version() >= '4':
self.set_configuration_options({ 'start_rpc' : None}, delete_empty=True, delete_always=True)
else:
self.set_configuration_options(common.CCM_40_YAML_OPTIONS, delete_empty=True, delete_always=True)
return self
def actively_watch_logs_for_error(self, on_error_call, interval=1):
"""
Begins a thread that repeatedly scans system.log for new errors, every interval seconds.
(The first pass covers the entire log contents written at that point,
subsequent scans cover newly appended log messages).
Reports new errors, by calling the provided callback with an OrderedDictionary
mapping node name to a list of error lines.
Returns the thread itself, which should be .join()'ed to wrap up execution,
otherwise will run until the main thread exits.
"""
class LogWatchingThread(threading.Thread):
"""
This class is embedded here for now, because it is used only from
within Cluster, and depends on cluster.nodelist().
"""
def __init__(self, cluster):
super(LogWatchingThread, self).__init__()
self.cluster = cluster
self.daemon = True # set so that thread will exit when main thread exits
self.req_stop_event = threading.Event()
self.done_event = threading.Event()
self.log_positions = defaultdict(int)
def scan(self):
errordata = OrderedDict()
try:
for node in self.cluster.nodelist():
scan_from_mark = self.log_positions[node.name]
next_time_scan_from_mark = node.mark_log()
if next_time_scan_from_mark == scan_from_mark:
# log hasn't advanced, nothing to do for this node
continue
else:
errors = node.grep_log_for_errors_from(seek_start=scan_from_mark)
self.log_positions[node.name] = next_time_scan_from_mark
if errors:
errordata[node.name] = errors
except IOError as e:
if 'No such file or directory' in str(e.strerror):
pass # most likely log file isn't yet written
# in the case of unexpected error, report this thread to the callback
else:
errordata['log_scanner'] = [[str(e)]]
return errordata
def scan_and_report(self):
errordata = self.scan()
if errordata:
on_error_call(errordata)
def run(self):
common.debug("Log-watching thread starting.")
# run until stop gets requested by .join()
while not self.req_stop_event.is_set():
self.scan_and_report()
time.sleep(interval)
try:
# do a final scan to make sure we got to the very end of the files
self.scan_and_report()
finally:
common.debug("Log-watching thread exiting.")
# done_event signals that the scan completed a final pass
self.done_event.set()
def join(self, timeout=None):
# signals to the main run() loop that a stop is requested
self.req_stop_event.set()
# now wait for the main loop to get through a final log scan, and signal that it's done
self.done_event.wait(timeout=interval * 2) # need to wait at least interval seconds before expecting thread to finish. 2x for safety.
super(LogWatchingThread, self).join(timeout)
log_watcher = LogWatchingThread(self)
log_watcher.start()
return log_watcher
def get_install_dir(self):
common.validate_install_dir(self.__install_dir)
return self.__install_dir
def hasOpscenter(self):
return False
def nodelist(self):
return [self.nodes[name] for name in sorted(self.nodes.keys())]
def version(self):
return self.__version
def cassandra_version(self):
return self.version()
def address_regex(self):
return "/([0-9.]+):7000" if self.cassandra_version() >= '4.0' else "/([0-9.]+)"
def add(self, node, is_seed, data_center=None):
if node.name in self.nodes:
raise common.ArgumentError('Cannot create existing node %s' % node.name)
self.nodes[node.name] = node
if is_seed:
self.seeds.append(node)
self._update_config()
node.data_center = data_center
if data_center is None:
for existing_node in self.nodelist():
if existing_node.data_center is not None:
raise common.ArgumentError('Please specify the DC this node should be added to')
node.set_log_level(self.__log_level)
for debug_class in self._debug:
node.set_log_level("DEBUG", debug_class)
for trace_class in self._trace:
node.set_log_level("TRACE", trace_class)
if data_center is not None:
self.__update_topology_files()
node._save()
return self
def populate(self, nodes, debug=False, tokens=None, use_vnodes=None, ipprefix='127.0.0.', ipformat=None, install_byteman=False, use_single_interface=False):
"""Populate a cluster with nodes
@use_single_interface : Populate the cluster with nodes that all share a single network interface.
"""
if self.cassandra_version() < '4' and use_single_interface:
raise common.ArgumentError('use_single_interface is not supported in versions < 4.0')
node_count = nodes
dcs = []
if use_vnodes is None:
self.use_vnodes = len(tokens or []) > 1 or self._more_than_one_token_configured()
else:
self.use_vnodes = use_vnodes
if isinstance(nodes, list):
# We set PFS here as a "marker" that we need to read cassandra-topology.properties for this cluster
# This is then checked in node.py::_update_yaml where we check if initial_location_provider is set in
# the yaml (indicating that modern config is supported) and we set TopologyFileLocationProvider if so
self.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.PropertyFileSnitch'})
node_count = 0
i = 0
for c in nodes:
i = i + 1
node_count = node_count + c
for x in xrange(0, c):
dcs.append('dc%d' % i)
if node_count < 1:
raise common.ArgumentError('invalid node count %s' % nodes)
for i in xrange(1, node_count + 1):
if 'node%s' % i in list(self.nodes.values()):
raise common.ArgumentError('Cannot create existing node node%s' % i)
if tokens is None:
if self.use_vnodes:
# from 4.0 tokens can be pre-generated via the `allocate_tokens_for_local_replication_factor: 3` strategy
# this saves time, as allocating tokens during first start is slow and non-concurrent
if self.can_generate_tokens() and not 'CASSANDRA_TOKEN_PREGENERATION_DISABLED' in self._environment_variables:
if len(dcs) <= 1:
for x in xrange(0, node_count):
dcs.append('dc1')
tokens = self.generated_tokens(dcs)
else:
common.debug("using balanced tokens for non-vnode cluster")
if len(dcs) <= 1:
tokens = self.balanced_tokens(node_count)
else:
tokens = self.balanced_tokens_across_dcs(dcs)
if not ipformat:
ipformat = ipprefix + "%d"
for i in xrange(1, node_count + 1):
tk = None
if tokens is not None and i - 1 < len(tokens):
tk = tokens[i - 1]
dc = dcs[i - 1] if i - 1 < len(dcs) else None
binary = None
if self.cassandra_version() >= '1.2':
if use_single_interface:
#Always leave 9042 and 9043 clear, in case someone defaults to adding
# a node with those ports
binary = (ipformat % 1, 9042 + 2 + (i * 2))
else:
binary = (ipformat % i, 9042)
thrift = None
if self.cassandra_version() < '4':
thrift = (ipformat % i, 9160)
storage_interface = ((ipformat % i), 7000)
if use_single_interface:
#Always leave 7000 and 7001 in case someone defaults to adding
#with those port numbers
storage_interface = (ipformat % 1, 7000 + 2 + (i * 2))
node = self.create_node(name='node%s' % i,
auto_bootstrap=False,
thrift_interface=thrift,
storage_interface=storage_interface,
jmx_port=str(7000 + i * 100),
remote_debug_port=str(2000 + i * 100) if debug else str(0),
byteman_port=str(4000 + i * 100) if install_byteman else str(0),
initial_token=tk,
binary_interface=binary,
environment_variables=self._environment_variables)
self.add(node, True, dc)
self._update_config()
return self
def create_node(self, name, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None, derived_cassandra_version=None):
return Node(name, self, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface, byteman_port, environment_variables, derived_cassandra_version=derived_cassandra_version)
def balanced_tokens(self, node_count):
if self.cassandra_version() >= '1.2' and (not self.partitioner or 'Murmur3' in self.partitioner):
ptokens = [(i * (2 ** 64 // node_count)) for i in xrange(0, node_count)]
return [int(t - 2 ** 63) for t in ptokens]
return [int(i * (2 ** 127 // node_count)) for i in range(0, node_count)]
def balanced_tokens_across_dcs(self, dcs):
tokens = []
current_dc = dcs[0]
count = 0
dc_count = 0
for dc in dcs:
if dc == current_dc:
count += 1
else:
new_tokens = [tk + (dc_count * 100) for tk in self.balanced_tokens(count)]
tokens.extend(new_tokens)
current_dc = dc
count = 1
dc_count += 1
new_tokens = [tk + (dc_count * 100) for tk in self.balanced_tokens(count)]
tokens.extend(new_tokens)
return tokens
def _more_than_one_token_configured(self):
return int(self._config_options.get('num_tokens', '1')) > 1
def can_generate_tokens(self):
return (self.cassandra_version() >= '4'
and (self.partitioner is None or ('Murmur3' in self.partitioner or 'Random' in self.partitioner))
and self._more_than_one_token_configured
and not 'CASSANDRA_TOKEN_PREGENERATION_DISABLED' in self._environment_variables)
def generated_tokens(self, dcs):
tokens = []
# all nodes are in rack1
current_dc = dcs[0]
node_count = 0
for dc in dcs:
if dc == current_dc:
node_count += 1
else:
self.generate_dc_tokens(node_count, tokens)
current_dc = dc
node_count = 1
self.generate_dc_tokens(node_count, tokens)
return tokens
def generate_dc_tokens(self, node_count, tokens):
if self.cassandra_version() < '4' or (self.partitioner and not ('Murmur3' in self.partitioner or 'Random' in self.partitioner)):
raise common.ArgumentError("generate-tokens script only for >=4.0 and Murmur3 or Random")
if not self._more_than_one_token_configured():
raise common.ArgumentError("Cannot use generate-tokens script without num_tokens > 1")
partitioner = 'RandomPartitioner' if ( self.partitioner and 'Random' in self.partitioner) else 'Murmur3Partitioner'
generate_tokens = common.join_bin(self.get_install_dir(), os.path.join('tools', 'bin'), 'generatetokens')
cmd_list = [generate_tokens, '-n', str(node_count), '-t', str(self._config_options.get("num_tokens")), '--rf', str(min(3,node_count)), '-p', partitioner]
process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ.copy())
# the first line is "Generating tokens for X nodes with" and can be ignored
process.stdout.readline()
for n in range(1,node_count+1):
stdout_output = re.sub(r'^.*?:', '', process.stdout.readline().decode("utf-8"))
node_tokens = stdout_output.replace('[','').replace(' ','').replace(']','').replace('\n','')
tokens.append(node_tokens)
assert 0 < len(tokens), "No tokens generated from invocation: {}".format(str(cmd_list))
common.debug("pregenerated tokens from cmd_list: {} are {}".format(str(cmd_list),tokens))
def remove(self, node=None, gently=False):
if node is not None:
if node.name not in self.nodes:
return
del self.nodes[node.name]
if node in self.seeds:
self.seeds.remove(node)
self._update_config()
node.stop(gently=gently)
self.remove_dir_with_retry(node.get_path())
else:
self.stop(gently=gently)
self.remove_dir_with_retry(self.get_path())
# We can race w/shutdown on Windows and get Access is denied attempting to delete node logs.
# see CASSANDRA-10075
def remove_dir_with_retry(self, path):
tries = 0
removed = False
while removed is False:
try:
common.rmdirs(path)
removed = True
except Exception as e:
tries = tries + 1
time.sleep(.1)
if tries == 5:
raise e
def clear(self):
self.stop()
for node in list(self.nodes.values()):
node.clear()
def get_path(self):
return os.path.join(self.__path, self.name)
def get_seeds(self):
if self.cassandra_version() >= '4.0':
#They might be overriding the storage port config now
storage_port = self._config_options.get("storage_port")
storage_interfaces = [s.network_interfaces['storage'] for s in self.seeds if isinstance(s, Node)]
seeds = []
#Convert node storage interfaces to IP strings and maybe replace the port
for storage_interface in storage_interfaces:
port = storage_port if storage_port is not None else str(storage_interface[1])
if ":" in storage_interface[0]:
seeds.append("[" + storage_interface[0] + "]:" + port)
else:
seeds.append(storage_interface[0] + ":" + port)
#For seeds that are strings need to update the port in the string
for seed in [string for string in self.seeds if not isinstance(string, Node)]:
url = urlparse("http://" + seed)
if storage_port is not None:
seeds.append(url.hostname + ":" + str(storage_port))
else:
seeds.append(seed)
return seeds
else:
return [s.network_interfaces['storage'][0] if isinstance(s, Node) else s for s in self.seeds]
def show(self, verbose):
msg = "Cluster: '{}'".format(self.name)
print_(msg)
print_('-' * len(msg))
if len(list(self.nodes.values())) == 0:
print_("No node in this cluster yet")
return
for node in list(self.nodes.values()):
if verbose:
node.show(show_cluster=False)
print_("")
else:
node.show(only_status=True)
def start(self, no_wait=False, verbose=False, wait_for_binary_proto=True,
wait_other_notice=True, jvm_args=None, profile_options=None,
quiet_start=False, allow_root=False, jvm_version=None, **kwargs):
if jvm_args is None:
jvm_args = []
extension.pre_cluster_start(self)
# check whether all loopback aliases are available before starting any nodes
for node in list(self.nodes.values()):
if not node.is_running():
for itf in node.network_interfaces.values():
if itf is not None:
common.assert_socket_available(itf)
started = []
for node in list(self.nodes.values()):
if not node.is_running():
mark = 0
if os.path.exists(node.logfilename()):
mark = node.mark_log()
# if the node is going to allocate_strategy_ tokens during start, then wait_for_binary_proto=True
node_wait_for_binary_proto = (self.can_generate_tokens() and self.use_vnodes and node.initial_token is None)
p = node.start(update_pid=False, jvm_args=jvm_args, jvm_version=jvm_version,
profile_options=profile_options, verbose=verbose, quiet_start=quiet_start,
allow_root=allow_root, wait_for_binary_proto=node_wait_for_binary_proto)
# Prior to JDK8, starting every node at once could lead to a
# nanotime collision where the RNG that generates a node's tokens
# gives identical tokens to several nodes. Thus, we stagger
# the node starts
if common.get_jdk_version() < '1.8':
time.sleep(1)
started.append((node, p, mark))
if no_wait:
time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set
else:
for node, p, mark in started:
if not node._wait_for_running(p, timeout_s=7):
raise NodeError("Node {} should be running before waiting for <started listening> log message, "
"but C* process is terminated.".format(node.name))
try:
timeout=kwargs.get('timeout', DEFAULT_CLUSTER_WAIT_TIMEOUT_IN_SECS)
timeout=int(os.environ.get('CCM_CLUSTER_START_TIMEOUT_OVERRIDE', timeout))
start_message = "Listening for thrift clients..." if self.cassandra_version() < "2.2" else "Starting listening for CQL clients"
node.watch_log_for(start_message, timeout=timeout, process=p, verbose=verbose, from_mark=mark,
error_on_pid_terminated=True)
except RuntimeError:
return None
self.__update_pids(started)
for node, p, _ in started:
if not node.is_running():
raise NodeError("Error starting {0}.".format(node.name), p)
if not no_wait:
if wait_other_notice:
for (node, _, mark), (other_node, _, _) in itertools.permutations(started, 2):
node.watch_log_for_alive(other_node, from_mark=mark)
if wait_for_binary_proto:
for node, p, mark in started:
node.wait_for_binary_interface(process=p, verbose=verbose, from_mark=mark)
extension.post_cluster_start(self)
return started
def stop(self, wait=True, signal_event=signal.SIGTERM, **kwargs):
not_running = []
extension.pre_cluster_stop(self)
for node in list(self.nodes.values()):
if not node.stop(wait=wait, signal_event=signal_event, **kwargs):
not_running.append(node)
extension.post_cluster_stop(self)
return not_running
def set_log_level(self, new_level, class_names=None):
class_names = class_names or []
known_level = ['TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'OFF']
if new_level not in known_level:
raise common.ArgumentError("Unknown log level %s (use one of %s)" % (new_level, " ".join(known_level)))
if class_names:
for class_name in class_names:
if new_level == 'DEBUG':
if class_name in self._trace:
raise common.ArgumentError("Class %s already in TRACE" % (class_name))
self._debug.append(class_name)
if new_level == 'TRACE':
if class_name in self._debug:
raise common.ArgumentError("Class %s already in DEBUG" % (class_name))
self._trace.append(class_name)
else:
self.__log_level = new_level
self._update_config()
for node in self.nodelist():
for class_name in class_names:
node.set_log_level(new_level, class_name)
def wait_for_compactions(self, timeout=600):
"""
Wait for all compactions to finish on all nodes.
"""
for node in list(self.nodes.values()):
if node.is_running():
node.wait_for_compactions(timeout)
return self
def nodetool(self, nodetool_cmd):
for node in list(self.nodes.values()):
if node.is_running():
node.nodetool(nodetool_cmd)
return self
def allNativePortsMatch(self):
current_port = None
for node in self.nodes.values():
if current_port is None:
current_port = node.network_interfaces['binary'][1]
elif current_port != node.network_interfaces['binary'][1]:
return False
return True
def stress(self, stress_options):
stress = common.get_stress_bin(self.get_install_dir())
livenodes = [node.network_interfaces['binary'] for node in list(self.nodes.values()) if node.is_live()]
if len(livenodes) == 0:
print_('No live node')
return
def live_node_ips_joined():
return ','.join(n[0] for n in livenodes)
nodes_options = []
if self.cassandra_version() <= '2.1':
if '-d' not in stress_options:
nodes_options = ['-d', live_node_ips_joined()]
args = [stress] + nodes_options + stress_options
elif self.cassandra_version() >= '4.0':
if '-node' not in stress_options:
nodes_options = ['-node', ','.join([node[0] + ':' + str(node[1]) for node in livenodes])]
args = [stress] + stress_options + nodes_options
else:
if '-node' not in stress_options:
nodes_options = ['-node', live_node_ips_joined()]
args = [stress] + stress_options + nodes_options
rc = None
try:
# need to set working directory for env on Windows
if common.is_win():
rc = subprocess.call(args, cwd=common.parse_path(stress))
else:
rc = subprocess.call(args)
except KeyboardInterrupt:
pass
return rc
def set_configuration_options(self, values=None, delete_empty=False, delete_always=False):
if values is not None:
self._config_options = common.merge_configuration(self._config_options, values, delete_empty=delete_empty, delete_always=delete_always)
self._persist_config()
self.__update_topology_files()
return self
def set_configuration_yaml(self, configuration_yaml):
self.configuration_yaml = configuration_yaml
self._persist_config()
self.__update_topology_files()
return self
def set_batch_commitlog(self, enabled, use_batch_window=True):
for node in list(self.nodes.values()):
node.set_batch_commitlog(enabled=enabled, use_batch_window=use_batch_window)
def set_dse_configuration_options(self, values=None):
raise common.ArgumentError('Cannot set DSE configuration options on a Cassandra cluster')
def set_environment_variable(self, key, value):
self._environment_variables[key] = value
for node in list(self.nodes.values()):
node.set_environment_variable(key, value)
self._persist_config()
def _persist_config(self):
self._update_config()
for node in list(self.nodes.values()):
node.import_config_files()
def flush(self):
self.nodetool("flush")
def compact(self):
self.nodetool("compact")
def drain(self):
self.nodetool("drain")
def repair(self):
self.nodetool("repair")
def cleanup(self):
self.nodetool("cleanup")
def decommission(self):
for node in list(self.nodes.values()):
if node.is_running():
node.decommission()
def removeToken(self, token):
self.nodetool("removeToken " + str(token))
def bulkload(self, options):
livenodes = [node for node in self.nodes.values() if node.is_live()]
if not livenodes:
raise common.ArgumentError("No live node")
random.choice(livenodes).bulkload(options)
def scrub(self, options):
for node in list(self.nodes.values()):
node.scrub(options)
def verify(self, options):
for node in list(self.nodes.values()):
node.verify(options)
def enable_aoss(self):
common.error("Cannot enable AOSS in C* clusters")
exit(1)
def update_log4j(self, new_log4j_config):
# iterate over all nodes
for node in self.nodelist():
node.update_log4j(new_log4j_config)
def update_logback(self, new_logback_config):
# iterate over all nodes
for node in self.nodelist():
node.update_logback(new_logback_config)
def __get_version_from_build(self):
return self.getNodeClass().get_version_from_build(self.get_install_dir())
def _update_config(self):
node_list = [node.name for node in list(self.nodes.values())]
seed_list = self.get_seeds()
filename = os.path.join(self.__path, self.name, 'cluster.conf')
config_map = {
'name': self.name,
'nodes': node_list,
'seeds': seed_list,
'partitioner': self.partitioner,
'install_dir': self.__install_dir,
'config_options': self._config_options,
'dse_config_options': self._dse_config_options,
'misc_config_options' : self._misc_config_options,
'log_level': self.__log_level,
'use_vnodes': self.use_vnodes,
'configuration_yaml': self.configuration_yaml,
'datadirs': self.data_dir_count,
'environment_variables': self._environment_variables,
'cassandra_version': str(self.cassandra_version())
}
extension.append_to_cluster_config(self, config_map)
with open(filename, 'w') as f:
yaml.safe_dump(config_map, f)
def __update_pids(self, started):
for node, p, _ in started:
node._update_pid(p)
def __update_topology_files(self):
dcs = [('default', 'dc1')]
for node in self.nodelist():
if node.data_center is not None:
dcs.append((node.address(), node.data_center))
for node in self.nodelist():
node.update_topology(dcs)
def enable_ssl(self, ssl_path, require_client_auth):
shutil.copyfile(os.path.join(ssl_path, 'keystore.jks'), os.path.join(self.get_path(), 'keystore.jks'))
shutil.copyfile(os.path.join(ssl_path, 'cassandra.crt'), os.path.join(self.get_path(), 'cassandra.crt'))
ssl_options = {'enabled': True,
'keystore': os.path.join(self.get_path(), 'keystore.jks'),
'keystore_password': 'cassandra'
}
# determine if truststore client encryption options should be enabled
truststore_file = os.path.join(ssl_path, 'truststore.jks')
if os.path.isfile(truststore_file):
shutil.copyfile(truststore_file, os.path.join(self.get_path(), 'truststore.jks'))
truststore_ssl_options = {'require_client_auth': require_client_auth,
'truststore': os.path.join(self.get_path(), 'truststore.jks'),
'truststore_password': 'cassandra'
}
ssl_options.update(truststore_ssl_options)
self._config_options['client_encryption_options'] = ssl_options
self._update_config()
def enable_internode_ssl(self, node_ssl_path):
shutil.copyfile(os.path.join(node_ssl_path, 'keystore.jks'), os.path.join(self.get_path(), 'internode-keystore.jks'))
shutil.copyfile(os.path.join(node_ssl_path, 'truststore.jks'), os.path.join(self.get_path(), 'internode-truststore.jks'))
node_ssl_options = {
'internode_encryption': 'all',
'keystore': os.path.join(self.get_path(), 'internode-keystore.jks'),
'keystore_password': 'cassandra',
'truststore': os.path.join(self.get_path(), 'internode-truststore.jks'),
'truststore_password': 'cassandra'
}
if self.cassandra_version() >= '4.0':
node_ssl_options['enabled'] = True
self._config_options['server_encryption_options'] = node_ssl_options
self._update_config()
def enable_pwd_auth(self):
self._config_options['authenticator'] = 'PasswordAuthenticator'
self._update_config()
def timed_grep_nodes_for_patterns(self, versions_to_patterns, timeout_seconds, filename="system.log"):
"""
Searches all nodes in the cluster for a specific regular expression based on the node's version.
Params:
@versions_to_patterns : an instance of LogPatternToVersionMap, specifying the different log patterns based on a node's version.
@version : the earliest version the new pattern was introduced.
@timeout_seconds : the amount of time to spend searching the logs for.
@filename : the name of the file to search for the patterns. Defaults to "system.log".
Returns the first node where the pattern was found, along with the matching lines.
Raises a TimeoutError if the pattern is not found within the specified timeout period.
"""
start_time = time.time()
while True:
TimeoutError.raise_if_passed(start=start_time, timeout=timeout_seconds,
msg="Unable to find: {x} in any node log within {t} s".format(
x=versions_to_patterns.patterns, t=timeout_seconds))
for node in self.nodelist():
pattern = versions_to_patterns(node.get_cassandra_version())
matchings = node.grep_log(pattern, filename)
if matchings:
ret = namedtuple('Node_Log_Matching', 'node matchings')
return ret(node=node, matchings=matchings)
time.sleep(1)
def wait_for_any_log(self, pattern, timeout, filename='system.log', marks=None):
return common.wait_for_any_log(self.nodelist(), pattern, timeout, filename=filename, marks=marks)
def show_logs(self, selected_nodes_names=None):
"""
Shows logs of nodes in this cluster, by default, with multitail.
If you need to alter the command or options, change CCM_MULTITAIL_CMD .
Params:
@selected_nodes_names : a list-like object that contains names of nodes to be shown. If empty, this will show all nodes in the cluster.
"""
if selected_nodes_names is None:
selected_nodes_names = []
if len(self.nodes) == 0:
print_("There are no nodes in this cluster yet.")
return
nodes = sorted(list(self.nodes.values()), key=lambda node: node.name)
nodes_names = [node.name for node in nodes]
names_logs_dict = {node.name: node.logfilename() for node in nodes}
if len(selected_nodes_names) == 0: # Parameter selected_nodes_names is empty
return names_logs_dict.values()
else:
if set(selected_nodes_names).issubset(nodes_names):
return [names_logs_dict[name] for name in selected_nodes_names]
else:
raise ValueError("nodes in this cluster are {}. But nodes in argments are {}".format(
nodes_names, selected_nodes_names
))