ccmlib/node.py (1,733 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ccm node from __future__ import absolute_import, with_statement import errno import glob import locale import logging import os import psutil import re import shlex import shutil import signal import stat import subprocess import sys import time import warnings from collections import namedtuple from datetime import datetime from distutils.version import LooseVersion #pylint: disable=import-error, no-name-in-module import yaml from six import print_, string_types from ccmlib import common, extension from ccmlib.repository import setup from six.moves import xrange logger = logging.getLogger(__name__) NODE_WAIT_TIMEOUT_IN_SECS = 90 class Status(): UNINITIALIZED = "UNINITIALIZED" UP = "UP" DOWN = "DOWN" DECOMMISSIONED = "DECOMMISSIONED" class NodeError(Exception): def __init__(self, msg, process=None): Exception.__init__(self, msg) self.process = process class TimeoutError(Exception): def __init__(self, data): Exception.__init__(self, str(data)) @staticmethod def raise_if_passed(start, timeout, msg, node=None): if start + timeout < time.time(): raise TimeoutError.create(start, timeout, msg, node) @staticmethod def create(start, timeout, msg, node=None): tstamp = time.strftime("%d %b %Y %H:%M:%S", time.gmtime()) duration = round(time.time() - start, 2) node_s = " [{}]".format(node) if node else "" msg = "{tstamp}{nodes} after {d}/{t} seconds {msg}".format( tstamp=tstamp, nodes=node_s, msg=msg, d=duration, t=timeout) return TimeoutError(msg) class ToolError(Exception): def __init__(self, command, exit_status, stdout=None, stderr=None): self.command = command self.exit_status = exit_status self.stdout = stdout self.stderr = stderr message = "Subprocess {} exited with non-zero status; exit status: {}".format(command, exit_status) if stdout: message += "; \nstdout: " message += self.__decode(stdout) if stderr: message += "; \nstderr: " message += self.__decode(stderr) Exception.__init__(self, message) def __decode(self, value): if isinstance(value, bytes): return bytes.decode(value, locale.getpreferredencoding(False)) return value # Groups: 1 = cf, 2 = tmp or none, 3 = suffix (Compacted or Data.db) _sstable_regexp = re.compile('((?P<keyspace>[^\s-]+)-(?P<cf>[^\s-]+)-)?(?P<tmp>tmp(link)?-)?(?P<version>[^\s-]+)-(?P<number>\d+)-(?P<format>([a-z]+)-)?(?P<suffix>[a-zA-Z]+)\.[a-zA-Z0-9]+$') class Node(object): """ Provides interactions to a Cassandra node. """ @staticmethod def get_version_from_build(install_dir=None, node_path=None, cassandra=False): if install_dir is None and node_path is not None: install_dir = get_install_dir_from_cluster_conf(node_path) if install_dir is not None: # hack, nodes can be of a different type to the cluster node_class = extension.get_cluster_class(install_dir).getNodeClass() if node_class is not Node: return node_class.get_version_from_build(install_dir, node_path, cassandra) # Binary cassandra installs will have a 0.version.txt file version_file = os.path.join(install_dir, '0.version.txt') if os.path.exists(version_file): with open(version_file) as f: return LooseVersion(f.read().strip()) # Source cassandra installs we can read from build.xml build = os.path.join(install_dir, 'build.xml') if os.path.exists(build): with open(build) as f: for line in f: match = re.search('name="base\.version" value="([0-9.]+)[^"]*"', line) if match: return LooseVersion(match.group(1)) raise common.CCMError("Cannot find version") def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None, byteman_startup_script=None, derived_cassandra_version=None): """ Create a new Node. - name: the name for that node - cluster: the cluster this node is part of - auto_bootstrap: whether or not this node should be set for auto-bootstrap - thrift_interface: the (host, port) tuple for thrift - storage_interface: the (host, port) tuple for internal cluster communication - jmx_port: the port for JMX to bind to - remote_debug_port: the port for remote debugging - initial_token: the token for this node. If None, use Cassandra token auto-assignment - save: copy all data useful for this node to the right position. Leaving this true is almost always the right choice. """ self.name = name self.cluster = cluster self.status = Status.UNINITIALIZED self.auto_bootstrap = auto_bootstrap self.network_interfaces = {'thrift': common.normalize_interface(thrift_interface), 'storage': common.normalize_interface(storage_interface), 'binary': common.normalize_interface(binary_interface)} (self.ip_addr, _) = self.network_interfaces['thrift'] if thrift_interface else self.network_interfaces['binary'] self.jmx_port = jmx_port self.remote_debug_port = remote_debug_port self.byteman_port = byteman_port self.byteman_startup_script = byteman_startup_script self.initial_token = initial_token self.pid = None self.data_center = None self.workloads = [] self._dse_config_options = {} self.__config_options = {} self._topology = [('default', 'dc1')] self.__install_dir = None self.__global_log_level = None self.__classes_log_level = {} self.__environment_variables = environment_variables or {} self.__environment_variables = self.__environment_variables.copy() self.__original_java_home = None self.__original_path = None self.__conf_updated = False if derived_cassandra_version: self._cassandra_version = derived_cassandra_version else: try: self._cassandra_version = self.get_version_from_build(self.get_install_dir(), cassandra=True) # call get_base_cassandra_version() to validate _cassandra_version self.get_base_cassandra_version() except (common.CCMError, ValueError): self._cassandra_version = self.cluster.cassandra_version() if save: self.import_config_files() self.import_bin_files() if common.is_win(): self.__clean_bat() @staticmethod def load(path, name, cluster): """ Load a node from from the path on disk to the config files, the node name and the cluster the node is part of. """ node_path = os.path.join(path, name) filename = os.path.join(node_path, 'node.conf') with open(filename, 'r') as f: data = yaml.safe_load(f) try: itf = data['interfaces'] initial_token = None if 'initial_token' in data: initial_token = data['initial_token'] cassandra_version = None if 'cassandra_version' in data: cassandra_version = LooseVersion(data['cassandra_version']) remote_debug_port = 2000 if 'remote_debug_port' in data: remote_debug_port = data['remote_debug_port'] binary_interface = None if 'binary' in itf and itf['binary'] is not None: binary_interface = tuple(itf['binary']) thrift_interface = None if 'thrift' in itf and itf['thrift'] is not None: thrift_interface = tuple(itf['thrift']) node = cluster.create_node(data['name'], data['auto_bootstrap'], thrift_interface, tuple(itf['storage']), data['jmx_port'], remote_debug_port, initial_token, save=False, binary_interface=binary_interface, byteman_port=data['byteman_port'], derived_cassandra_version=cassandra_version) node.status = data['status'] if 'pid' in data: node.pid = int(data['pid']) if 'install_dir' in data: node.__install_dir = data['install_dir'] if 'config_options' in data: node.__config_options = data['config_options'] if 'dse_config_options' in data: node._dse_config_options = data['dse_config_options'] if 'environment_variables' in data: node.__environment_variables = data['environment_variables'] if 'data_center' in data: node.data_center = data['data_center'] if 'workloads' in data: node.workloads = data['workloads'] return node except KeyError as k: raise common.LoadError("Error Loading " + filename + ", missing property: " + str(k)) def get_path(self): """ Returns the path to this node top level directory (where config/data is stored) """ return os.path.join(self.cluster.get_path(), self.name) def get_bin_dir(self): """ Returns the path to the directory where Cassandra scripts are located """ return os.path.join(self.get_path(), 'bin') def get_tool(self, toolname): return common.join_bin(self.get_install_dir(), 'bin', toolname) def get_tool_args(self, toolname): return [common.join_bin(self.get_install_dir(), 'bin', toolname)] def get_env(self): update_conf = not self.__conf_updated if update_conf: self.__conf_updated = True env = common.make_cassandra_env(self.get_install_dir(), self.get_path(), update_conf) env = common.update_java_version(jvm_version=None, install_dir=self.get_install_dir(), cassandra_version=self.get_cassandra_version(), env=env, info_message=self.name) for (key, value) in self.__environment_variables.items(): env[key] = value return env def get_install_cassandra_root(self): return self.get_install_dir() def get_node_cassandra_root(self): return self.get_path() def get_conf_dir(self): """ Returns the path to the directory where Cassandra config are located """ return os.path.join(self.get_path(), 'conf') def get_conf_file(self): """ Returns the path to the configuration yaml file """ conf_name = self.cluster.configuration_yaml return os.path.join(self.get_conf_dir(), conf_name if conf_name is not None else common.CASSANDRA_CONF) def address_for_current_version_slashy(self): """ Returns the address formatted for the current version (InetAddress/InetAddressAndPort.toString) """ if self.get_cassandra_version() >= '4.0': return "/{}".format(str(self.address_and_port())) else: return "/{}".format(str(self.address())); def address_for_version(self, version): """ Returns the address formatted for the specified (InetAddress.getHostAddress or InetAddressAndPort.getHostAddressAndPort) """ if version >= LooseVersion('4.0'): return self.address_and_port() else: return "{}".format(str(self.address())); def address_for_current_version(self): """ Returns the address formatted for the current version. """ return self.address_for_version(self.get_cassandra_version()) def address(self): """ Returns the IP use by this node for internal communication """ return self.network_interfaces['storage'][0] def address_and_port(self): """ Returns the IP used for internal communication along with ports """ address = self.network_interfaces['storage'][0] port = self.network_interfaces['storage'][1] if address.find(":") != -1: return "[{}]:{}".format(address, port) else: return str(address) + ':' + str(port) def get_install_dir(self): """ Returns the path to the cassandra source directory used by this node. """ if self.__install_dir is None: return self.cluster.get_install_dir() else: common.validate_install_dir(self.__install_dir) return self.__install_dir def node_setup(self, version, verbose): dir, v = setup(version, verbose=verbose) return dir def set_install_dir(self, install_dir=None, version=None, verbose=False): """ Sets the path to the cassandra source directory for use by this node. """ if version is None: self.__install_dir = install_dir if install_dir is not None: common.validate_install_dir(install_dir) else: self.__install_dir = self.node_setup(version, verbose=verbose) self._cassandra_version = self.get_version_from_build(self.__install_dir, cassandra=True) if self.get_base_cassandra_version() >= 4.0: self.network_interfaces['thrift'] = None self.import_config_files() self.import_bin_files() self.__conf_updated = False if self.get_cassandra_version() >= '4': self.set_configuration_options(values={'start_rpc': None}, delete_empty=True, delete_always=True) self.set_configuration_options(values={'rpc_port': None}, delete_empty=True, delete_always=True) else: self.set_configuration_options(common.CCM_40_YAML_OPTIONS, delete_empty=True, delete_always=True) return self def set_workloads(self, workloads): raise common.ArgumentError("Cannot set workloads on a cassandra node") def get_cassandra_version(self): return self._cassandra_version def get_base_cassandra_version(self): version = self.get_cassandra_version() return float('.'.join(re.split('\.|-',version.vstring)[:2])) def set_configuration_options(self, values=None, delete_empty=False, delete_always=False): """ Set Cassandra configuration options. ex: node.set_configuration_options(values={ 'hinted_handoff_enabled' : True, 'concurrent_writes' : 64, }) """ if (not hasattr(self,'__config_options') and not hasattr(self,'_Node__config_options')) or self.__config_options is None: self.__config_options = {} if values is not None: self.__config_options = common.merge_configuration(self.__config_options, values, delete_empty=delete_empty, delete_always=delete_always) self.import_config_files() def set_environment_variable(self, key, value): self.__environment_variables[key] = value self.import_config_files() def set_batch_commitlog(self, enabled=False, use_batch_window=True): """ The batch_commitlog option gives an easier way to switch to batch commitlog (since it requires setting 2 options and unsetting one). """ if enabled: if use_batch_window: values = { "commitlog_sync": "batch", "commitlog_sync_batch_window_in_ms": 5, "commitlog_sync_period_in_ms": None } else: values = { "commitlog_sync": "batch", "commitlog_sync_period_in_ms": None } else: if use_batch_window: values = { "commitlog_sync": "periodic", "commitlog_sync_batch_window_in_ms": None, "commitlog_sync_period_in_ms": 10000 } else: values = { "commitlog_sync": "periodic", "commitlog_sync_period_in_ms": 10000 } self.set_configuration_options(values) def set_dse_configuration_options(self, values=None): pass def show(self, only_status=False, show_cluster=True): """ Print infos on this node configuration. """ self.__update_status() indent = ''.join([" " for i in xrange(0, len(self.name) + 2)]) print_("{}: {}".format(self.name, self.__get_status_string())) if not only_status: if show_cluster: print_("{}{}={}".format(indent, 'cluster', self.cluster.name)) print_("{}{}={}".format(indent, 'auto_bootstrap', self.auto_bootstrap)) if self.network_interfaces['thrift'] is not None: print_("{}{}={}".format(indent, 'thrift', self.network_interfaces['thrift'])) if self.network_interfaces['binary'] is not None: print_("{}{}={}".format(indent, 'binary', self.network_interfaces['binary'])) print_("{}{}={}".format(indent, 'storage', self.network_interfaces['storage'])) print_("{}{}={}".format(indent, 'jmx_port', self.jmx_port)) print_("{}{}={}".format(indent, 'remote_debug_port', self.remote_debug_port)) print_("{}{}={}".format(indent, 'byteman_port', self.byteman_port)) print_("{}{}={}".format(indent, 'initial_token', self.initial_token)) if self.pid: print_("{}{}={}".format(indent, 'pid', self.pid)) def is_running(self): """ Return true if the node is running """ self.__update_status() return self.status == Status.UP or self.status == Status.DECOMMISSIONED def is_live(self): """ Return true if the node is live (it's run and is not decommissioned). """ self.__update_status() return self.status == Status.UP def log_directory(self): return os.path.join(self.get_path(), 'logs') def logfilename(self): """ Return the path to the current Cassandra log of this node. """ return os.path.join(self.log_directory(), 'system.log') def debuglogfilename(self): return os.path.join(self.log_directory(), 'debug.log') def gclogfilename(self): return os.path.join(self.log_directory(), 'gc.log.0.current') def compactionlogfilename(self): return os.path.join(self.log_directory(), 'compaction.log') def envfilename(self): return os.path.join( self.get_conf_dir(), common.CASSANDRA_WIN_ENV if common.is_win() else common.CASSANDRA_ENV ) def grep_log(self, expr, filename='system.log', from_mark=None): """ Returns a list of lines matching the regular expression in parameter in the Cassandra log of this node """ matchings = [] pattern = re.compile(expr) with open(os.path.join(self.log_directory(), filename)) as f: if from_mark: f.seek(from_mark) for line in f: m = pattern.search(line) if m: matchings.append((line, m)) return matchings def grep_log_for_errors(self, filename='system.log'): """ Returns a list of errors with stack traces in the Cassandra log of this node """ return self.grep_log_for_errors_from(seek_start=getattr(self, 'error_mark', 0)) def grep_log_for_errors_from(self, filename='system.log', seek_start=0): with open(os.path.join(self.log_directory(), filename)) as f: f.seek(seek_start) return _grep_log_for_errors(f.read()) def mark_log_for_errors(self, filename='system.log'): """ Ignore errors behind this point when calling node.grep_log_for_errors() """ self.error_mark = self.mark_log(filename) def mark_log(self, filename='system.log'): """ Returns "a mark" to the current position of this node Cassandra log. This is for use with the from_mark parameter of watch_log_for_* methods, allowing to watch the log from the position when this method was called. """ log_file = os.path.join(self.log_directory(), filename) if not os.path.exists(log_file): return 0 with open(log_file) as f: f.seek(0, os.SEEK_END) return f.tell() def print_process_output(self, name, proc, verbose=False): # If stderr_file exists on the process, we opted to # store stderr in a separate temporary file, consume that. if hasattr(proc, 'stderr_file') and proc.stderr_file is not None: proc.stderr_file.seek(0) stderr = proc.stderr_file.read() else: try: stderr = proc.communicate()[1] except ValueError: stderr = '' if len(stderr) > 1: print_("[{} ERROR] {}".format(name, stderr.strip())) # This will return when exprs are found or it timeouts def watch_log_for(self, exprs, from_mark=None, timeout=600, process=None, verbose=False, filename='system.log', error_on_pid_terminated=False): """ Watch the log until one or more (regular) expressions are found or timeouts (a TimeoutError is then raised). On successful completion, a list of pair (line matched, match object) is returned. Will raise NodeError if error_on_pit_terminated is True and C* pid is not running. """ start = time.time() tofind = [exprs] if isinstance(exprs, string_types) else exprs tofind = [re.compile(e) for e in tofind] matchings = [] reads = "" if len(tofind) == 0: return None log_file = os.path.join(self.log_directory(), filename) output_read = False while not os.path.exists(log_file): time.sleep(.5) TimeoutError.raise_if_passed(start=start, timeout=timeout, node=self.name, msg="Timed out waiting for {} to be created.".format(log_file)) if process and not output_read: process.poll() if process.returncode is not None: self.print_process_output(self.name, process, verbose) output_read = True if process.returncode != 0: raise RuntimeError() # Shouldn't reuse RuntimeError but I'm lazy with open(log_file) as f: if from_mark: f.seek(from_mark) while True: # First, if we have a process to check, then check it. # Skip on Windows - stdout/stderr is cassandra.bat if not common.is_win() and not output_read: if process: process.poll() if process.returncode is not None: self.print_process_output(self.name, process, verbose) output_read = True if process.returncode != 0: raise RuntimeError() # Shouldn't reuse RuntimeError but I'm lazy line = f.readline() if line: reads = reads + line for e in tofind: m = e.search(line) if m: matchings.append((line, m)) tofind.remove(e) if len(tofind) == 0: return matchings[0] if isinstance(exprs, string_types) else matchings else: # wait for the situation to clarify, either stop or just a pause in log production time.sleep(1) if error_on_pid_terminated: self.raise_node_error_if_cassandra_process_is_terminated() TimeoutError.raise_if_passed(start=start, timeout=timeout, node=self.name, msg="Missing: {exprs} not found in {f}:\n Head: {head}\n Tail: {tail}" .format( exprs=[e.pattern for e in tofind], f=filename, head=reads[:50], tail="..."+reads[len(reads)-150:])) # Checking "process" is tricky, as it may be itself terminated e.g. after "verbose" # or if there is some race condition between log checking and start process finish # so if the "error_on_pid_terminated" is requested we will give it a chance # and will not check parent process termination if process and not error_on_pid_terminated: if common.is_win(): if not self.is_running(): return None else: process.poll() if process.returncode == 0: common.debug("{pid} or its child process terminated. watch_for_logs() for {l} will not continue.".format( pid=process.pid, l=[e.pattern for e in tofind])) return None def watch_log_for_no_errors(self, exprs, from_mark=None, timeout=600, process=None, verbose=False, filename='system.log'): """ Watch the log until one or more (regular) expressions are found or timeouts (a TimeoutError is then raised). On successful completion, a list of pair (line matched, match object) is returned. This method is different from watch_log_for as it will raise a AssertionError if the log contain an error; this assertion will contain the errors found in the log. """ start = time.time() tofind = [exprs] if isinstance(exprs, string_types) else exprs tofind = [re.compile(e) for e in tofind] seek_start=0 if from_mark: seek_start = from_mark while True: TimeoutError.raise_if_passed(start=start, timeout=timeout, node=self.name, msg="Missing: {exprs} not found in {f}".format( exprs=[e.pattern for e in tofind], f=filename)) try: # process is bin/cassandra so choosing to ignore this argument to avoid early termination return self.watch_log_for(tofind, from_mark=from_mark, timeout=5, verbose=verbose, filename=filename) except TimeoutError: logger.debug("waited 5s watching for '{}' but was not found; checking for errors".format(tofind)) # since the api doesn't return the mark read it isn't thread safe to use mark # as the length of the file can change between calls which may mean we skip over # errors; to avoid this keep reading the whole file over and over again... # unless a explicit mark is given to the method, will read from that offset errors = self.grep_log_for_errors_from(filename=filename, seek_start=seek_start) if errors: msg = "Errors were found in the logs while watching for '{}'; attempting to fail the test".format(tofind) logger.debug(msg) raise AssertionError("{}:\n".format(msg) + '\n\n'.join(['\n'.join(msg) for msg in errors])) def watch_log_for_death(self, nodes, from_mark=None, timeout=600, filename='system.log'): """ Watch the log of this node until it detects that the provided other nodes are marked dead. This method returns nothing but throw a TimeoutError if all the requested node have not been found to be marked dead before timeout sec. A mark as returned by mark_log() can be used as the from_mark parameter to start watching the log from a given position. Otherwise the log is watched from the beginning. """ tofind = nodes if isinstance(nodes, list) else [nodes] tofind = ["%s is now [dead|DOWN]" % node.address_for_version(self.get_cassandra_version()) for node in tofind] self.watch_log_for(tofind, from_mark=from_mark, timeout=timeout, filename=filename) def watch_log_for_alive(self, nodes, from_mark=None, timeout=120, filename='system.log'): """ Watch the log of this node until it detects that the provided other nodes are marked UP. This method works similarly to watch_log_for_death. """ tofind = nodes if isinstance(nodes, list) else [nodes] tofind = ["%s.* is now UP" % node.address_for_version(self.get_cassandra_version()) for node in tofind] self.watch_log_for(tofind, from_mark=from_mark, timeout=timeout, filename=filename) def raise_node_error_if_cassandra_process_is_terminated(self): if not self._is_pid_running(): msg = "C* process with {pid} is terminated".format(pid=self.pid) common.debug(msg) raise NodeError(msg) def wait_for_binary_interface(self, **kwargs): """ Waits for the Binary CQL interface to be listening. If > 1.2 will check log for 'Starting listening for CQL clients' before checking for the interface to be listening. Emits a warning if not listening after given timeout (default NODE_WAIT_TIMEOUT_IN_SECS) in seconds. Raises NodeError if C* process terminates during start. Raises TimeoutError if log message for "starting listening" is not found in logs in given timeout. """ timeout = kwargs.get('timeout', NODE_WAIT_TIMEOUT_IN_SECS) kwargs['timeout'] = timeout if self.pid: kwargs['error_on_pid_terminated'] = True if self.cluster.version() >= '1.2': self.watch_log_for("Starting listening for CQL clients", **kwargs) binary_itf = self.network_interfaces['binary'] if not common.check_socket_listening(binary_itf, timeout=timeout): warnings.warn("Binary interface %s:%s is not listening after %s seconds, node may have failed to start." % (binary_itf[0], binary_itf[1], timeout)) def wait_for_thrift_interface(self, **kwargs): """ Waits for the Thrift interface to be listening. Emits a warning if not listening after given timeout (default NODE_WAIT_TIMEOUT_IN_SECS) in seconds. """ if self.cluster.version() >= '4': return timeout = kwargs.get('timeout', NODE_WAIT_TIMEOUT_IN_SECS) kwargs['timeout'] = timeout self.watch_log_for("Listening for thrift clients...", **kwargs) thrift_itf = self.network_interfaces['thrift'] if not common.check_socket_listening(thrift_itf, timeout=timeout): warnings.warn( "Thrift interface {}:{} is not listening after {} seconds, node may have failed to start.".format( thrift_itf[0], thrift_itf[1], timeout)) def get_launch_bin(self): cdir = self.get_install_dir() launch_bin = common.join_bin(cdir, 'bin', 'cassandra') # Copy back the cassandra scripts since profiling may have modified it the previous time shutil.copy(launch_bin, self.get_bin_dir()) return common.join_bin(self.get_path(), 'bin', 'cassandra') def add_custom_launch_arguments(self, args): pass def start(self, join_ring=True, no_wait=False, verbose=False, update_pid=True, wait_other_notice=True, replace_token=None, replace_address=None, jvm_args=None, wait_for_binary_proto=False, profile_options=None, use_jna=False, quiet_start=False, allow_root=False, set_migration_task=True, jvm_version=None): """ Start the node. Options includes: - join_ring: if false, start the node with -Dcassandra.join_ring=False - no_wait: by default, this method returns when the node is started and listening to clients. If no_wait=True, the method returns sooner. - wait_other_notice: if truthy, this method returns only when all other live node of the cluster have marked this node UP. if an integer, sets the timeout for how long to wait - replace_token: start the node with the -Dcassandra.replace_token option. - replace_address: start the node with the -Dcassandra.replace_address option. """ if jvm_args is None: jvm_args = [] if set_migration_task and self.cluster.cassandra_version() >= '3.0.1': jvm_args += ['-Dcassandra.migration_task_wait_in_seconds={}'.format(len(self.cluster.nodes) * 2)] # Validate Windows env if common.is_modern_windows_install(self.cluster.version()) and not common.is_ps_unrestricted(): raise NodeError("PS Execution Policy must be unrestricted when running C* 2.1+") if not common.is_win() and quiet_start: common.warning("Tried to set Windows quiet start behavior, but we're not running on Windows.") if self.is_running(): raise NodeError("{} is already running".format(self.name)) for itf in list(self.network_interfaces.values()): if itf is not None and replace_address is None: common.assert_socket_available(itf) if wait_other_notice: marks = [(node, node.mark_log()) for node in list(self.cluster.nodes.values()) if node.is_live()] else: marks = [] self.mark = self.mark_log() launch_bin = self.get_launch_bin() # If Windows, change entries in .bat file to split conf from binaries if common.is_win(): self.__clean_bat() if profile_options is not None: config = common.get_config() if 'yourkit_agent' not in config: raise NodeError("Cannot enable profile. You need to set 'yourkit_agent' to the path of your agent in a ~/.ccm/config") cmd = '-agentpath:{}'.format(config['yourkit_agent']) if 'options' in profile_options: cmd = cmd + '=' + profile_options['options'] print_(cmd) # Yes, it's fragile as shit pattern = r'cassandra_parms="-Dlog4j.configuration=log4j-server.properties -Dlog4j.defaultInitOverride=true' common.replace_in_file(launch_bin, pattern, ' ' + pattern + ' ' + cmd + '"') os.chmod(launch_bin, os.stat(launch_bin).st_mode | stat.S_IEXEC) env = self.get_env() extension.append_to_server_env(self, env) if common.is_win(): self._clean_win_jmx() pidfile = os.path.join(self.get_path(), 'cassandra.pid') args = [launch_bin] self.add_custom_launch_arguments(args) args = args + ['-p', pidfile, '-Dcassandra.join_ring=%s' % str(join_ring)] args.append('-Dcassandra.logdir=%s' % os.path.join(self.get_path(), 'logs')) if replace_token is not None: args.append('-Dcassandra.replace_token=%s' % str(replace_token)) if replace_address is not None: args.append('-Dcassandra.replace_address=%s' % str(replace_address)) if use_jna is False: args.append('-Dcassandra.boot_without_jna=true') if allow_root: args.append('-R') env['JVM_EXTRA_OPTS'] = env.get('JVM_EXTRA_OPTS', "") + " " + " ".join(jvm_args) # Upgrade scenarios may want to test upgrades to a specific Java version. In case a prior C* version # requires a lower Java version, we would keep its JAVA_HOME in self.__environment_variables and therefore # prevent using the "intended" Java version for the C* version to upgrade to. if not self.__original_java_home: # Save the "original" JAVA_HOME + PATH to restore it. self.__original_java_home = os.environ['JAVA_HOME'] self.__original_path = os.environ['PATH'] logger.info("Saving original JAVA_HOME={} PATH={}".format(self.__original_java_home, self.__original_path)) else: # Restore the "original" JAVA_HOME + PATH to restore it. env['JAVA_HOME'] = self.__original_java_home env['PATH'] = self.__original_path logger.info("Restoring original JAVA_HOME={} PATH={}".format(self.__original_java_home, self.__original_path)) env = common.update_java_version(jvm_version=jvm_version, install_dir=self.get_install_dir(), cassandra_version=self.get_cassandra_version(), env=env, info_message=self.name) # Need to update the node's environment for nodetool and other tools. # (e.g. the host's JAVA_HOME points to Java 11, but the node's software is only for Java 8) for k in 'JAVA_HOME', 'PATH': self.__environment_variables[k] = env[k] common.info("Starting {} with JAVA_HOME={} java_version={} cassandra_version={}, install_dir={}" .format(self.name, env['JAVA_HOME'], common.get_jdk_version_int(env=env), self.get_cassandra_version(), self.get_install_dir())) # In case we are restarting a node # we risk reading the old cassandra.pid file self._delete_old_pid() # Always write the stdout+stderr of the launched process to log files to make finding startup issues easier. start_time = time.time() stdout_sink = open(os.path.join(self.log_directory(), 'startup-{}-stdout.log'.format(start_time)), "w+") stderr_sink = open(os.path.join(self.log_directory(), 'startup-{}-stderr.log'.format(start_time)), "w+") if common.is_win(): # clean up any old dirty_pid files from prior runs dirty_pid_path = os.path.join(self.get_path() + "dirty_pid.tmp") if (os.path.isfile(dirty_pid_path)): os.remove(dirty_pid_path) if quiet_start and self.cluster.version() >= '2.2.4': args.append('-q') process = subprocess.Popen(args, cwd=self.get_bin_dir(), env=env, stdout=stdout_sink, stderr=stderr_sink) else: process = subprocess.Popen(args, env=env, stdout=stdout_sink, stderr=stderr_sink) process.stderr_file = stderr_sink if verbose: common.debug("verbose mode: waiting for the start process out/err (and termination)") stdout, stderr = process.communicate() print_(str(stdout)) print_(str(stderr)) # Our modified batch file writes a dirty output with more than just the pid - clean it to get in parity # with *nix operation here. if common.is_win(): self.__clean_win_pid() self._update_pid(process) print_("Started: {0} with pid: {1}".format(self.name, self.pid), file=sys.stderr, flush=True) if update_pid or wait_for_binary_proto: # at this moment we should have PID and it should be running... if not self._wait_for_running(process, timeout_s=7): raise NodeError("Node {n} is not running".format(n=self.name), process) # if requested wait for other nodes to observe this one (via gossip) if common.is_int_not_bool(wait_other_notice): for node, mark in marks: node.watch_log_for_alive(self, from_mark=mark, timeout=wait_other_notice) elif wait_other_notice: for node, mark in marks: node.watch_log_for_alive(self, from_mark=mark) # if requested wait for binary protocol to start if common.is_int_not_bool(wait_for_binary_proto): self.wait_for_binary_interface(from_mark=self.mark, timeout=wait_for_binary_proto) elif wait_for_binary_proto: self.wait_for_binary_interface(from_mark=self.mark) return process def _wait_for_running(self, process, timeout_s): deadline = time.time() + timeout_s while time.time() < deadline: if self.is_running(): return True time.sleep(0.5) self._update_pid(process) return self.is_running() def stop(self, wait=True, wait_other_notice=False, signal_event=signal.SIGTERM, **kwargs): """ Stop the node. - wait: if True (the default), wait for the Cassandra process to be really dead. Otherwise return after having sent the kill signal. - wait_other_notice: return only when the other live nodes of the cluster have marked this node has dead. - signal_event: Signal event to send to Cassandra; default is to let Cassandra clean up and shut down properly (SIGTERM [15]) - Optional: + gently: Let Cassandra clean up and shut down properly; unless false perform a 'kill -9' which shuts down faster. """ if self.is_running(): if wait_other_notice: marks = [(node, node.mark_log()) for node in list(self.cluster.nodes.values()) if node.is_live() and node is not self] if common.is_win(): # Just taskkill the instance, don't bother trying to shut it down gracefully. # Node recovery should prevent data loss from hard shutdown. # We have recurring issues with nodes not stopping / releasing files in the CI # environment so it makes more sense just to murder it hard since there's # really little downside. # We want the node to flush its data before shutdown as some tests rely on small writes being present. # The default Periodic sync at 10 ms may not have flushed data yet, causing tests to fail. # This is not a hard requirement, however, so we swallow any exceptions this may throw and kill anyway. if signal_event is signal.SIGTERM: try: self.flush() except: common.warning("Failed to flush node: {0} on shutdown.".format(self.name)) pass os.system("taskkill /F /PID " + str(self.pid)) if self._find_pid_on_windows(): common.warning("Failed to terminate node: {0} with pid: {1}".format(self.name, self.pid)) else: # Determine if the signal event should be updated to keep API compatibility if 'gently' in kwargs and kwargs['gently'] is False: signal_event = signal.SIGKILL os.kill(self.pid, signal_event) if wait_other_notice: for node, mark in marks: node.watch_log_for_death(self, from_mark=mark) else: time.sleep(.1) still_running = self.is_running() if still_running and wait: wait_time_sec = 1 for i in xrange(0, 7): # we'll double the wait time each try and cassandra should # not take more than 1 minute to shutdown time.sleep(wait_time_sec) if not self.is_running(): return True wait_time_sec = wait_time_sec * 2 raise NodeError("Problem stopping node %s" % self.name) else: return True else: return False def wait_for_compactions(self, timeout=120): """ Wait for all compactions to finish on this node. """ pattern = re.compile("pending tasks:? +0") start = time.time() while time.time() - start < timeout: output, err, rc = self.nodetool("compactionstats") if pattern.search(output): return time.sleep(1) raise TimeoutError.create(start=start, timeout=timeout, msg="Compactions did not finish in {} seconds".format(timeout), node=self.name) def nodetool_process(self, cmd): env = self.get_env() nodetool = self.get_tool('nodetool') args = [nodetool, '-h', 'localhost', '-p', str(self.jmx_port)] args += shlex.split(cmd) return subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) def nodetool(self, cmd): p = self.nodetool_process(cmd) return handle_external_tool_process(p, ['nodetool', '-h', 'localhost', '-p', str(self.jmx_port)] + shlex.split(cmd)) def dsetool(self, cmd): raise common.ArgumentError('Cassandra nodes do not support dsetool') def dse(self, dse_options=None): raise common.ArgumentError('Cassandra nodes do not support dse') def hadoop(self, hadoop_options=None): raise common.ArgumentError('Cassandra nodes do not support hadoop') def hive(self, hive_options=None): raise common.ArgumentError('Cassandra nodes do not support hive') def pig(self, pig_options=None): raise common.ArgumentError('Cassandra nodes do not support pig') def sqoop(self, sqoop_options=None): raise common.ArgumentError('Cassandra nodes do not support sqoop') def bulkload_process(self, options): loader_bin = common.join_bin(self.get_path(), 'bin', 'sstableloader') env = self.get_env() extension.append_to_client_env(self, env) # CASSANDRA-8358 switched from thrift to binary port host, port = self.network_interfaces['thrift'] if self.get_cassandra_version() < '2.2' else self.network_interfaces['binary'] args = ['-d', host, '-p', str(port)] return subprocess.Popen([loader_bin] + args + options, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) def bulkload(self, options): p = self.bulkload_process(options=options) return handle_external_tool_process(p, ['sstable bulkload'] + options) def scrub_process(self, options): scrub_bin = self.get_tool('sstablescrub') env = self.get_env() return subprocess.Popen([scrub_bin] + options, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) def scrub(self, options): p = self.scrub_process(options=options) return handle_external_tool_process(p, ['sstablescrub'] + options) def verify_process(self, options): verify_bin = self.get_tool('sstableverify') env = self.get_env() return subprocess.Popen([verify_bin] + options, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) def verify(self, options): p = self.verify_process(options=options) return handle_external_tool_process(p, ['sstableverify'] + options) def enable_aoss(self, thrift_port=10000, web_ui_port=9077): pass def run_cqlsh_process(self, cmds=None, cqlsh_options=None, terminator=''): if cqlsh_options is None: cqlsh_options = [] cqlsh = self.get_tool('cqlsh') env = self.get_env() extension.append_to_client_env(self, env) if self.get_base_cassandra_version() >= 2.1: host, port = self.network_interfaces['binary'] else: host, port = self.network_interfaces['thrift'] args = [] args += cqlsh_options extension.append_to_cqlsh_args(self, env, args) args += [host, str(port)] sys.stdout.flush() if cmds is None: if common.is_win(): subprocess.Popen([cqlsh] + args, env=env, creationflags=subprocess.CREATE_NEW_CONSOLE) else: os.execve(cqlsh, [common.platform_binary('cqlsh')] + args, env) else: p = subprocess.Popen([cqlsh] + args, env=env, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) if cmds is not None: for cmd in cmds.split(';'): cmd = cmd.strip() if cmd: p.stdin.write(cmd + terminator) p.stdin.write("quit;\n") return p def run_cqlsh(self, cmds=None, cqlsh_options=None, terminator=';\n'): p = self.run_cqlsh_process(cmds, cqlsh_options, terminator) return handle_external_tool_process(p, ['cqlsh', cmds, cqlsh_options]) def set_log_level(self, new_level, class_name=None): known_level = ['TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'OFF'] if new_level not in known_level: raise common.ArgumentError("Unknown log level %s (use one of %s)" % (new_level, " ".join(known_level))) if class_name: self.__classes_log_level[class_name] = new_level else: self.__global_log_level = new_level # loggers changed > 2.1 if self.get_base_cassandra_version() < 2.1: self._update_log4j() else: self.__update_logback() return self # # Update log4j config: copy new log4j-server.properties into # ~/.ccm/name-of-cluster/nodeX/conf/log4j-server.properties # def update_log4j(self, new_log4j_config): cassandra_conf_dir = os.path.join(self.get_conf_dir(), 'log4j-server.properties') common.copy_file(new_log4j_config, cassandra_conf_dir) # # Update logback config: copy new logback.xml into # ~/.ccm/name-of-cluster/nodeX/conf/logback.xml # def update_logback(self, new_logback_config): cassandra_conf_dir = os.path.join(self.get_conf_dir(), 'logback.xml') common.copy_file(new_logback_config, cassandra_conf_dir) def update_startup_byteman_script(self, byteman_startup_script): """ Update the byteman startup script, i.e., rule injected before the node starts. :param byteman_startup_script: the relative path to the script :raise common.LoadError: if the node does not have byteman installed """ if self.byteman_port == '0': raise common.LoadError('Byteman is not installed') self.byteman_startup_script = byteman_startup_script self.import_config_files() def clear(self, clear_all=False, only_data=False): data_dirs = ['data{0}'.format(x) for x in xrange(0, self.cluster.data_dir_count)] data_dirs.append("commitlogs") if clear_all: data_dirs.extend(['saved_caches', 'logs']) for d in data_dirs: full_dir = os.path.join(self.get_path(), d) if only_data and d != "commitlogs": for dir in os.listdir(full_dir): keyspace_dir = os.path.join(full_dir, dir) if os.path.isdir(keyspace_dir) and dir != "system" and dir != "system_cluster_metadata": for f in os.listdir(keyspace_dir): table_dir = os.path.join(keyspace_dir, f) shutil.rmtree(table_dir) os.mkdir(table_dir) else: common.rmdirs(full_dir) os.mkdir(full_dir) # Needed for any subdirs stored underneath a data directory. # Common for hints post CASSANDRA-6230 for dir in self._get_directories(): if not os.path.exists(dir): os.mkdir(dir) def run_sstable2json(self, out_file=None, keyspace=None, datafiles=None, column_families=None, keys=None, enumerate_keys=False): if out_file is None: out_file = sys.stdout sstable2json = self._find_cmd('sstable2json') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) print_(sstablefiles) for sstablefile in sstablefiles: print_("-- {0} -----".format(os.path.basename(sstablefile))) args = [sstable2json, sstablefile] if enumerate_keys: args = args + ["-e"] if keys is not None: for key in keys: args = args + ["-k", key] subprocess.call(args, env=env, stdout=out_file) print_("") def run_json2sstable(self, in_file, ks, cf, keyspace=None, datafiles=None, column_families=None, enumerate_keys=False): json2sstable = self._find_cmd('json2sstable') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) for sstablefile in sstablefiles: in_file_name = os.path.abspath(in_file.name) args = [json2sstable, "-s", "-K", ks, "-c", cf, in_file_name, sstablefile] subprocess.call(args, env=env) def run_sstablesplit_process(self, datafiles=None, size=None, keyspace=None, column_families=None, no_snapshot=False, debug=False): sstablesplit = self._find_cmd('sstablesplit') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) processes = [] def do_split(f): print_("-- {0}-----".format(os.path.basename(f))) cmd = [sstablesplit] if size is not None: cmd += ['-s', str(size)] if no_snapshot: cmd.append('--no-snapshot') if debug: cmd.append('--debug') cmd.append(f) p = subprocess.Popen(cmd, cwd=os.path.join(self.get_install_dir(), 'bin'), env=env, stderr=subprocess.PIPE, stdout=subprocess.PIPE) processes.append(p) for sstablefile in sstablefiles: do_split(sstablefile) return processes def run_sstablesplit(self, datafiles=None, size=None, keyspace=None, column_families=None, no_snapshot=False, debug=False): processes = self.run_sstablesplit_process(datafiles, size, keyspace, column_families, no_snapshot, debug) results = [] for p in processes: results.append(handle_external_tool_process(p, "sstablesplit")) return results def run_sstablemetadata_process(self, datafiles=None, keyspace=None, column_families=None): cdir = self.get_install_dir() sstablemetadata = common.join_bin(cdir, os.path.join('tools', 'bin'), 'sstablemetadata') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=column_families) cmd = [sstablemetadata] cmd.extend(sstablefiles) return subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) def run_sstablemetadata(self, datafiles=None, keyspace=None, column_families=None): p = self.run_sstablemetadata_process(datafiles, keyspace, column_families) return handle_external_tool_process(p, "sstablemetadata on keyspace: {}, column_family: {}".format(keyspace, column_families)) def run_sstabledump_process(self, datafiles=None, keyspace=None, column_families=None, keys=None, enumerate_keys=False, command=False): sstabledump = self._find_cmd('sstabledump') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=column_families) processes = [] def do_dump(sstable): if command: print_("-- {0} -----".format(os.path.basename(sstable))) cmd = [sstabledump, sstable] if enumerate_keys: cmd.append('-e') if keys is not None: for key in keys: cmd = cmd + ["-k", key] p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) if command: out, err, rc = handle_external_tool_process(p, "sstabledump") print_(out) print_('\n') else: processes.append(p) for sstable in sstablefiles: do_dump(sstable) return processes def run_sstabledump(self, datafiles=None, keyspace=None, column_families=None, keys=None, enumerate_keys=False, command=False): processes = self.run_sstabledump_process(datafiles, keyspace, column_families, keys, enumerate_keys, command) results = [] for p in processes: results.append(handle_external_tool_process(p, "sstabledump")) return results def run_sstableexpiredblockers_process(self, keyspace=None, column_family=None): cdir = self.get_install_dir() sstableexpiredblockers = common.join_bin(cdir, os.path.join('tools', 'bin'), 'sstableexpiredblockers') env = self.get_env() cmd = [sstableexpiredblockers, keyspace, column_family] return subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) def run_sstableexpiredblockers(self, keyspace=None, column_family=None): p = self.run_sstableexpiredblockers_process(keyspace=keyspace, column_family=column_family) if p is not None: return handle_external_tool_process(p, ['sstableexpiredblockers', keyspace, column_family]) else: return None, None, None def run_sstableupgrade_process(self, keyspace=None, column_family=None): cdir = self.get_install_dir() sstableupgrade = self.get_tool('sstableupgrade') env = self.get_env() cmd = [sstableupgrade, keyspace, column_family] p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) return p def run_sstableupgrade(self, keyspace=None, column_family=None): p = self.run_sstableupgrade_process(keyspace, column_family) if p is not None: return handle_external_tool_process(p, "sstableupgrade on {} : {}".format(keyspace, column_family)) else: return None, None, None def get_sstablespath(self, datafiles=None, keyspace=None, tables=None, **kawrgs): sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=tables) return sstablefiles def run_sstablerepairedset_process(self, set_repaired=True, datafiles=None, keyspace=None, column_families=None): cdir = self.get_install_dir() sstablerepairedset = common.join_bin(cdir, os.path.join('tools', 'bin'), 'sstablerepairedset') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) processes = [] for sstable in sstablefiles: if set_repaired: cmd = [sstablerepairedset, "--really-set", "--is-repaired", sstable] else: cmd = [sstablerepairedset, "--really-set", "--is-unrepaired", sstable] p = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) processes.append(p) return processes def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=None, column_families=None): processes = self.run_sstablerepairedset_process(set_repaired=set_repaired, datafiles=datafiles, keyspace=keyspace, column_families=column_families) results = [] for p in processes: results.append(handle_external_tool_process(p, "sstablerepairedset on {} : {}".format(keyspace, column_families))) return results def run_sstablelevelreset_process(self, keyspace, cf): cdir = self.get_install_dir() sstablelevelreset = common.join_bin(cdir, os.path.join('tools', 'bin'), 'sstablelevelreset') env = self.get_env() cmd = [sstablelevelreset, "--really-reset", keyspace, cf] return subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) def run_sstablelevelreset(self, keyspace, cf): p = self.run_sstablelevelreset_process(keyspace, cf) return handle_external_tool_process(p, "sstablelevelreset on {} : {}".format(keyspace, cf)) def run_sstableofflinerelevel_process(self, keyspace, cf, dry_run=False): cdir = self.get_install_dir() sstableofflinerelevel = common.join_bin(cdir, os.path.join('tools', 'bin'), 'sstableofflinerelevel') env = self.get_env() if dry_run: cmd = [sstableofflinerelevel, "--dry-run", keyspace, cf] else: cmd = [sstableofflinerelevel, keyspace, cf] return subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False): p = self.run_sstableofflinerelevel_process(keyspace, cf, dry_run=dry_run) return handle_external_tool_process(p, "sstableoflinerelevel on {} : {}".format(keyspace, cf)) def run_sstableverify_process(self, keyspace, cf, options=None): cdir = self.get_install_dir() sstableverify = common.join_bin(cdir, 'bin', 'sstableverify') env = self.get_env() cmd = [sstableverify, keyspace, cf] if options is not None: cmd[1:1] = options return subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) def run_sstableverify(self, keyspace, cf, options=None): p = self.run_sstableverify_process(keyspace, cf, options=options) return handle_external_tool_process(p, "sstableverify on {} : {} with options: {}".format(keyspace, cf, options)) def _find_cmd(self, cmd): """ Locates command under cassandra root and fixes permissions if needed """ cdir = self.get_install_cassandra_root() if self.get_base_cassandra_version() >= 2.1: fcmd = common.join_bin(cdir, os.path.join('tools', 'bin'), cmd) else: fcmd = common.join_bin(cdir, 'bin', cmd) try: if os.path.exists(fcmd): os.chmod(fcmd, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) except: common.warning("Couldn't change permissions to use {0}.".format(cmd)) common.warning("If it didn't work, you will have to do so manually.") return fcmd def has_cmd(self, cmd): """ Indicates if specified command can be found under cassandra root """ return os.path.exists(self._find_cmd(cmd)) def list_keyspaces(self): keyspaces = os.listdir(os.path.join(self.get_path(), 'data0')) keyspaces.remove('system') return keyspaces def get_sstables_per_data_directory(self, keyspace, column_family): keyspace_dirs = [os.path.join(self.get_path(), "data{0}".format(x), keyspace) for x in xrange(0, self.cluster.data_dir_count)] cf_glob = '*' if column_family: # account for changes in data dir layout from CASSANDRA-5202 if self.get_base_cassandra_version() < 2.1: cf_glob = column_family else: cf_glob = column_family + '-*' for keyspace_dir in keyspace_dirs: if not os.path.exists(keyspace_dir): raise common.ArgumentError("Unknown keyspace {0}".format(keyspace)) # data directory layout is changed from 1.1 if self.get_base_cassandra_version() < 1.1: files = [glob.glob(os.path.join(keyspace_dir, "{0}*-Data.db".format(column_family))) for keyspace_dir in keyspace_dirs] elif self.get_base_cassandra_version() < 2.2: files = [glob.glob(os.path.join(keyspace_dir, cf_glob, "%s-%s*-Data.db" % (keyspace, column_family))) for keyspace_dir in keyspace_dirs] else: files = [glob.glob(os.path.join(keyspace_dir, cf_glob, "*-Data.db")) for keyspace_dir in keyspace_dirs] for d in files: for f in d: if os.path.exists(f.replace('Data.db', 'Compacted')): files.remove(f) return files def get_sstables(self, keyspace, column_family): return [f for sublist in self.get_sstables_per_data_directory(keyspace, column_family) for f in sublist] def get_sstables_via_sstableutil(self, keyspace, table, sstabletype='all', oplogs=False, cleanup=False, match='-Data.db'): env = common.make_cassandra_env(self.get_install_cassandra_root(), self.get_node_cassandra_root()) tool_bin = self.get_tool('sstableutil') args = [tool_bin, '--type', sstabletype] if oplogs: args.extend(['--oplog']) if cleanup: args.extend(['--cleanup']) args.extend([keyspace, table]) p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode != 0: common.error("""Error invoking sstableutil; returned {code}; args={args}; env={env} stdout: {stdout} stderr: {stderr} """.format(code=p.returncode, args=args, env=env, stdout=stdout, stderr=stderr)) raise Exception("Error invoking sstableutil; returned {code}".format(code=p.returncode)) return sorted(filter(lambda s: match in s, stdout.splitlines())) def stress_process(self, stress_options=None, whitelist=False): if stress_options is None: stress_options = [] else: stress_options = stress_options[:] stress = common.get_stress_bin(self.get_install_dir()) if self.cluster.cassandra_version() <= '2.1': stress_options.append('-d') stress_options.append(self.address()) else: stress_options.append('-node') if whitelist: stress_options.append("whitelist") stress_options.append(self.address()) # specify used jmx port if not already set if not [opt for opt in stress_options if opt.startswith('jmx=')]: stress_options.extend(['-port', 'jmx=' + self.jmx_port]) args = [stress] + stress_options try: p = subprocess.Popen(args, cwd=common.parse_path(stress), stdout=subprocess.PIPE, stderr=subprocess.PIPE) return p except KeyboardInterrupt: pass def stress(self, stress_options=None, whitelist=False): p = self.stress_process(stress_options=stress_options, whitelist=whitelist) try: return handle_external_tool_process(p, ['stress'] + stress_options) except KeyboardInterrupt: pass def shuffle(self, cmd): cdir = self.get_install_dir() shuffle = common.join_bin(cdir, 'bin', 'cassandra-shuffle') host = self.address() args = [shuffle, '-h', host, '-p', str(self.jmx_port)] + [cmd] try: subprocess.call(args) except KeyboardInterrupt: pass def data_size(self, live_data=None): """Uses `nodetool info` to get the size of a node's data in KB.""" if live_data is not None: warnings.warn("The 'live_data' keyword argument is deprecated.", DeprecationWarning) output = self.nodetool('info')[0] return _get_load_from_info_output(output) def flush(self, options=None): if options is None: options = [] args = ["flush"] + options cmd = ' '.join(args) self.nodetool(cmd) def compact(self, options=None): if options is None: options = [] args = ["compact"] + options cmd = ' '.join(args) self.nodetool(cmd) def drain(self, block_on_log=False): mark = self.mark_log() self.nodetool("drain") if block_on_log: self.watch_log_for("DRAINED", from_mark=mark) def repair(self, options=None): if options is None: options = [] args = ["repair"] + options cmd = ' '.join(args) return self.nodetool(cmd) def move(self, new_token): self.nodetool("move " + str(new_token)) def cleanup(self, options=None): if options is None: options = [] args = ["cleanup"] + options cmd = ' '.join(args) self.nodetool(cmd) def decommission(self, force=False): cmd = 'decommission' if force: cmd += " --force" self.nodetool(cmd) self.status = Status.DECOMMISSIONED self._update_config() def removeToken(self, token): self.nodetool("removeToken " + str(token)) def import_config_files(self): self._update_config() self.copy_config_files() self._update_yaml() self._update_topology_file() # loggers changed > 2.1 if self.get_base_cassandra_version() < 2.1: self._update_log4j() else: self.__update_logback() self.__update_envfile() def import_dse_config_files(self): raise common.ArgumentError('Cannot import DSE configuration files on a Cassandra node') def copy_config_files(self): conf_dir = os.path.join(self.get_install_dir(), 'conf') for name in os.listdir(conf_dir): filename = os.path.join(conf_dir, name) if os.path.isfile(filename): shutil.copy(filename, self.get_conf_dir()) def import_bin_files(self): bin_dir = os.path.join(self.get_install_dir(), 'bin') for name in os.listdir(bin_dir): filename = os.path.join(bin_dir, name) if os.path.isfile(filename): shutil.copy(filename, self.get_bin_dir()) common.add_exec_permission(bin_dir, name) def __clean_bat(self): # While the Windows specific changes to the batch files to get them to run are # fairly extensive and thus pretty brittle, all the changes are very unique to # the needs of ccm and shouldn't be pushed into the main repo. # Change the nodes to separate jmx ports bin_dir = os.path.join(self.get_path(), 'bin') jmx_port_pattern = "-Dcom.sun.management.jmxremote.port=" bat_file = os.path.join(bin_dir, "cassandra.bat") common.replace_in_file(bat_file, jmx_port_pattern, " " + jmx_port_pattern + self.jmx_port + "^") # Split binaries from conf home_pattern = "if NOT DEFINED CASSANDRA_HOME set CASSANDRA_HOME=%CD%" common.replace_in_file(bat_file, home_pattern, "set CASSANDRA_HOME=" + self.get_install_dir()) classpath_pattern = "set CLASSPATH=\\\"%CASSANDRA_HOME%\\\\conf\\\"" common.replace_in_file(bat_file, classpath_pattern, "set CCM_DIR=\"" + self.get_path() + "\"\nset CLASSPATH=\"%CCM_DIR%\\conf\"") # escape the double quotes in name of the lib files in the classpath jar_file_pattern = "do call :append \"%%i\"" for_statement = "for %%i in (\"%CASSANDRA_HOME%\lib\*.jar\")" common.replace_in_file(bat_file, jar_file_pattern, for_statement + " do call :append \\\"%%i\\\"") # escape double quotes in java agent path class_dir_pattern = "-javaagent:" common.replace_in_file(bat_file, class_dir_pattern, " -javaagent:\\\"%CASSANDRA_HOME%\\lib\\jamm-0.2.5.jar\\\"^") # escape the double quotes in name of the class directories class_dir_pattern = "set CASSANDRA_CLASSPATH=" main_classes = "\\\"%CASSANDRA_HOME%\\build\\classes\\main\\\";" thrift_classes = "\\\"%CASSANDRA_HOME%\\build\\classes\\thrift\\\"" common.replace_in_file(bat_file, class_dir_pattern, "set CASSANDRA_CLASSPATH=%CLASSPATH%;" + main_classes + thrift_classes) # background the server process and grab the pid run_text = "\\\"%JAVA_HOME%\\bin\\java\\\" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% \\\"%CASSANDRA_MAIN%\\\"" run_pattern = ".*-cp.*" common.replace_in_file(bat_file, run_pattern, "wmic process call create \"" + run_text + "\" > \"" + self.get_path() + "/dirty_pid.tmp\"\n") # On Windows, remove the VerifyPorts check from cassandra.ps1 if self.cluster.version() >= '2.1': common.replace_in_file(os.path.join(self.get_path(), 'bin', 'cassandra.ps1'), ' VerifyPortsAreAvailable', '') # Specifically call the .ps1 file in our node's folder common.replace_in_file(bat_file, 'powershell /file .*', 'powershell /file "' + os.path.join(self.get_path(), 'bin', 'cassandra.ps1" %*')) def _save(self): self._update_yaml() # loggers changed > 2.1 if self.get_base_cassandra_version() < 2.1: self._update_log4j() else: self.__update_logback() self.__update_envfile() self._update_config() def _update_config(self): dir_name = self.get_path() if not os.path.exists(dir_name): os.mkdir(dir_name) for dir in self._get_directories(): os.mkdir(dir) filename = os.path.join(dir_name, 'node.conf') values = { 'name': self.name, 'status': self.status, 'auto_bootstrap': self.auto_bootstrap, 'interfaces': self.network_interfaces, 'jmx_port': self.jmx_port, 'config_options': self.__config_options, 'dse_config_options': self._dse_config_options, 'environment_variables': self.__environment_variables, 'cassandra_version': str(self.get_cassandra_version()) } if self.pid: values['pid'] = self.pid if self.initial_token: values['initial_token'] = self.initial_token if self.__install_dir is not None: values['install_dir'] = self.__install_dir if self.remote_debug_port: values['remote_debug_port'] = self.remote_debug_port if self.byteman_port: values['byteman_port'] = self.byteman_port if self.data_center: values['data_center'] = self.data_center if self.workloads is not None: values['workloads'] = self.workloads with open(filename, 'w') as f: yaml.safe_dump(values, f) def _update_yaml(self): conf_file = self.get_conf_file() with open(conf_file, 'r') as f: data = yaml.safe_load(f) with open(conf_file, 'r') as f: yaml_text = f.read() data['cluster_name'] = self.cluster.name data['auto_bootstrap'] = self.auto_bootstrap data['initial_token'] = self.initial_token if not self.cluster.use_vnodes and self.get_base_cassandra_version() >= 1.2: data['num_tokens'] = 1 if 'seeds' in data: # cassandra 0.7 data['seeds'] = self.cluster.get_seeds() else: # cassandra 0.8 data['seed_provider'][0]['parameters'][0]['seeds'] = ','.join(self.cluster.get_seeds()) data['listen_address'], data['storage_port'] = self.network_interfaces['storage'] if self.network_interfaces['thrift'] is not None and self.get_base_cassandra_version() < 4: data['rpc_address'], data['rpc_port'] = self.network_interfaces['thrift'] if self.network_interfaces['binary'] is not None and self.get_base_cassandra_version() >= 1.2: data['rpc_address'], data['native_transport_port'] = self.network_interfaces['binary'] data['data_file_directories'] = [os.path.join(self.get_path(), 'data{0}'.format(x)) for x in xrange(0, self.cluster.data_dir_count)] data['commitlog_directory'] = os.path.join(self.get_path(), 'commitlogs') data['saved_caches_directory'] = os.path.join(self.get_path(), 'saved_caches') if 'metadata_directory' in data: data['metadata_directory'] = os.path.join(self.get_path(), 'metadata') if self.get_cassandra_version() > '3.0' and 'hints_directory' in yaml_text: data['hints_directory'] = os.path.join(self.get_path(), 'hints') if self.get_cassandra_version() >= '3.8': data['cdc_raw_directory'] = os.path.join(self.get_path(), 'cdc_raw') if self.cluster.partitioner: data['partitioner'] = self.cluster.partitioner # Get a map of combined cluster and node configuration with the node # configuration taking precedence. full_options = common.merge_configuration( self.cluster._config_options, self.__config_options, delete_empty=False) if 'endpoint_snitch' in full_options and full_options['endpoint_snitch'] == 'org.apache.cassandra.locator.PropertyFileSnitch': # multi dc cluster, needs to read cassandra-topology.properties - if cassandra.yaml is modern, we use TFLP and unset the endpoint_snitch if 'initial_location_provider' in data: data['initial_location_provider'] = 'org.apache.cassandra.locator.TopologyFileLocationProvider' full_options.pop('endpoint_snitch', None) else: # test might set endpoint_snitch: GPFS for example, in this case we need to keep that and unset ILP (or other way round) if 'initial_location_provider' in full_options: data.pop('endpoint_snitch', None) elif 'endpoint_snitch' in full_options: data.pop('initial_location_provider', None) data.pop('node_proximity', None) # Merge options with original yaml data. data = common.merge_configuration(data, full_options) conf_dest = os.path.join(self.get_conf_dir(), common.CASSANDRA_CONF) with open(conf_dest, 'w') as f: yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False) def _update_log4j(self): append_pattern = 'log4j.appender.R.File=' conf_file = os.path.join(self.get_conf_dir(), common.LOG4J_CONF) log_file = os.path.join(self.log_directory(), 'system.log') # log4j isn't partial to Windows \. I can't imagine why not. if common.is_win(): log_file = re.sub("\\\\", "/", log_file) common.replace_in_file(conf_file, append_pattern, append_pattern + log_file) # Setting the right log level # Replace the global log level if self.__global_log_level is not None: append_pattern = 'log4j.rootLogger=' common.replace_in_file(conf_file, append_pattern, append_pattern + self.__global_log_level + ',stdout,R') # Class specific log levels for class_name in self.__classes_log_level: logger_pattern = 'log4j.logger' full_logger_pattern = logger_pattern + '.' + class_name + '=' common.replace_or_add_into_file_tail(conf_file, full_logger_pattern, full_logger_pattern + self.__classes_log_level[class_name]) def __update_logback(self): conf_file = os.path.join(self.get_conf_dir(), common.LOGBACK_CONF) self.__update_logback_loglevel(conf_file) tools_conf_file = os.path.join(self.get_conf_dir(), common.LOGBACK_TOOLS_CONF) self.__update_logback_loglevel(tools_conf_file) def __update_logback_loglevel(self, conf_file): # Setting the right log level - 2.2.2 introduced new debug log if self.get_cassandra_version() >= '2.2.2' and self.__global_log_level: if self.__global_log_level in ['DEBUG', 'TRACE']: root_log_level = self.__global_log_level cassandra_log_level = self.__global_log_level elif self.__global_log_level == 'INFO': root_log_level = self.__global_log_level cassandra_log_level = 'DEBUG' elif self.__global_log_level in ['WARN', 'ERROR']: root_log_level = 'INFO' cassandra_log_level = 'DEBUG' system_log_filter_pattern = '<level>.*</level>' common.replace_in_file(conf_file, system_log_filter_pattern, ' <level>' + self.__global_log_level + '</level>') elif self.__global_log_level == 'OFF': root_log_level = self.__global_log_level cassandra_log_level = self.__global_log_level cassandra_append_pattern = '<logger name="org.apache.cassandra" level=".*"/>' common.replace_in_file(conf_file, cassandra_append_pattern, ' <logger name="org.apache.cassandra" level="' + cassandra_log_level + '"/>') else: root_log_level = self.__global_log_level # Replace the global log level and org.apache.cassandra log level if self.__global_log_level is not None: root_append_pattern = '<root level=".*">' common.replace_in_file(conf_file, root_append_pattern, '<root level="' + root_log_level + '">') # Class specific log levels for class_name in self.__classes_log_level: logger_pattern = '\t<logger name="' full_logger_pattern = logger_pattern + class_name + '" level=".*"/>' common.replace_or_add_into_file_tail(conf_file, full_logger_pattern, logger_pattern + class_name + '" level="' + self.__classes_log_level[class_name] + '"/>') def __update_envfile(self): agentlib_setting = '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address={}'.format(str(self.remote_debug_port)) remote_debug_options = agentlib_setting # The cassandra-env.ps1 file has been introduced in 2.1 if common.is_modern_windows_install(self.get_base_cassandra_version()): conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_WIN_ENV) jmx_port_pattern = '^\s+\$JMX_PORT=' jmx_port_setting = ' $JMX_PORT="' + self.jmx_port + '"' if self.get_cassandra_version() < '3.2': remote_debug_options = ' $env:JVM_OPTS="$env:JVM_OPTS {}"'.format(agentlib_setting) else: conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_ENV) jmx_port_pattern = 'JMX_PORT=' jmx_port_setting = 'JMX_PORT="' + self.jmx_port + '"' if self.get_cassandra_version() < '3.2': remote_debug_options = 'JVM_OPTS="$JVM_OPTS {}"'.format(agentlib_setting) common.replace_in_file(conf_file, jmx_port_pattern, jmx_port_setting) if common.is_modern_windows_install(self.get_version_from_build(node_path=self.get_path())): dst = os.path.join(self.get_conf_dir(), common.CASSANDRA_WIN_ENV) replacements = [ ('env:CASSANDRA_HOME =', ' $env:CASSANDRA_HOME="%s"' % self.get_install_dir()), ('env:CASSANDRA_CONF =', ' $env:CCM_DIR="' + self.get_path() + '\\conf"\n $env:CASSANDRA_CONF="$env:CCM_DIR"'), ('cp = ".*?env:CASSANDRA_HOME.conf', ' $cp = """$env:CASSANDRA_CONF"""') ] common.replaces_in_file(dst, replacements) if self.remote_debug_port != '0': remote_debug_port_pattern = '((-Xrunjdwp:)|(-agentlib:jdwp=))transport=dt_socket,server=y,suspend=n,address=' if self.get_cassandra_version() < '3.2': common.replace_in_file(conf_file, remote_debug_port_pattern, remote_debug_options) else: for f in glob.glob(os.path.join(self.get_conf_dir(), common.JVM_OPTS_PATTERN)): if os.path.isfile(f): common.replace_in_file(f, remote_debug_port_pattern, remote_debug_options) if self.byteman_port != '0': byteman_jar = glob.glob(os.path.join(self.get_install_dir(), 'build', 'lib', 'jars', 'byteman-[0-9]*.jar'))[0] agent_string = "-javaagent:{}=listener:true,boot:{},port:{}".format(byteman_jar, byteman_jar, str(self.byteman_port)) if self.byteman_startup_script is not None: agent_string = agent_string + ",script:{}".format(self.byteman_startup_script) if common.is_modern_windows_install(self.get_base_cassandra_version()): with open(conf_file, "r+") as conf_rewrite: conf_lines = conf_rewrite.readlines() # Remove trailing brace, will be replaced conf_lines = conf_lines[:-1] conf_lines.append(" $env:JVM_OPTS=\"$env:JVM_OPTS {}\"\n}}\n".format(agent_string)) conf_rewrite.seek(0) conf_rewrite.truncate() conf_rewrite.writelines(conf_lines) else: common.replaces_or_add_into_file_tail(conf_file, [('.*byteman.*', "JVM_OPTS=\"$JVM_OPTS {}\"".format(agent_string))], add_config_close=False) if self.get_cassandra_version() < '2.0.1': common.replace_in_file(conf_file, "-Xss", ' JVM_OPTS="$JVM_OPTS -Xss228k"') # gc.log was turned on by default in 2.2.5/3.0.3/3.3 if self.get_cassandra_version() >= '2.2.5': gc_log_pattern = "-Xloggc" gc_log_path = os.path.join(self.log_directory(), 'gc.log') if common.is_win(): gc_log_setting = ' $env:JVM_OPTS="$env:JVM_OPTS -Xloggc:{}"'.format(gc_log_path) else: gc_log_setting = 'JVM_OPTS="$JVM_OPTS -Xloggc:{}"'.format(gc_log_path) common.replace_in_file(conf_file, gc_log_pattern, gc_log_setting) # Java 9 gc_log_pattern = "-Xlog[:]gc=info" if common.is_win(): gc_log_setting = ' $env:JVM_OPTS="$env:JVM_OPTS -Xlog:gc=info,heap=trace,age=debug,safepoint=info,promotion=trace:file={}:time,uptime,pid,tid,level:filecount=10,filesize=10240"'.format(gc_log_path) else: gc_log_setting = 'JVM_OPTS="$JVM_OPTS -Xlog:gc=info,heap=trace,age=debug,safepoint=info,promotion=trace:file={}:time,uptime,pid,tid,level:filecount=10,filesize=10240"'.format(gc_log_path) common.replace_in_file(conf_file, gc_log_pattern, gc_log_setting) for itf in list(self.network_interfaces.values()): if itf is not None and common.interface_is_ipv6(itf): if self.get_cassandra_version() < '3.2': if common.is_win(): common.replace_in_file(conf_file, '-Djava.net.preferIPv4Stack=true', '\t$env:JVM_OPTS="$env:JVM_OPTS -Djava.net.preferIPv4Stack=false -Djava.net.preferIPv6Addresses=true"') else: common.replace_in_file(conf_file, '-Djava.net.preferIPv4Stack=true', 'JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=false -Djava.net.preferIPv6Addresses=true"') break else: for f in glob.glob(os.path.join(self.get_conf_dir(), common.JVM_OPTS_PATTERN)): if os.path.isfile(f): common.replace_in_file(f, '-Djava.net.preferIPv4Stack=true', '') break def update_topology(self, topology): self._topology = topology self._update_topology_file() def _update_topology_file(self): content = "" for k, v in self._topology: content = "%s%s=%s:r1\n" % (content, k, v) topology_file = os.path.join(self.get_conf_dir(), 'cassandra-topology.properties') with open(topology_file, 'w') as f: f.write(content) def _is_pid_running(self): if self.pid is None: return False if common.is_win(): return self._find_pid_on_windows() else: return self._find_pid_on_unix() def _find_pid_on_unix(self): try: os.kill(self.pid, 0) proc = psutil.Process(self.pid) if proc.status() == psutil.STATUS_ZOMBIE: time.sleep(2) raise OSError(errno.ESRCH, "process was zombie, ignoring") except OSError as err: if err.errno == errno.ESRCH: # not running return False elif err.errno == errno.EPERM: # no permission to signal this process return False else: # some other error raise except psutil.NoSuchProcess as err: return False else: return True def __update_status(self): if self.pid is None: if self.status in [Status.UP, Status.DECOMMISSIONED]: self.status = Status.DOWN return old_status = self.status pid_alive = self._is_pid_running() if pid_alive: if self.status in [Status.DOWN, Status.UNINITIALIZED]: self.status = Status.UP else: if self.status in [Status.UP, Status.DECOMMISSIONED]: self.status = Status.DOWN if not old_status == self.status: if old_status == Status.UP and self.status == Status.DOWN: self.pid = None self._update_config() def _find_pid_on_windows(self): found = False try: import psutil found = psutil.pid_exists(self.pid) except ImportError: common.warning("psutil not installed. Pid tracking functionality will suffer. See README for details.") cmd = 'tasklist /fi "PID eq ' + str(self.pid) + '"' proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) for line in proc.stdout: if re.match("Image", str(line)): found = True return found def _get_directories(self): dirs = [] for i in ['commitlogs', 'saved_caches', 'logs', 'conf', 'bin', 'hints']: dirs.append(os.path.join(self.get_path(), i)) for x in xrange(0, self.cluster.data_dir_count): dirs.append(os.path.join(self.get_path(), 'data{0}'.format(x))) return dirs def __get_status_string(self): if self.status == Status.UNINITIALIZED: return "{} ({})".format(Status.DOWN, "Not initialized") else: return self.status def __clean_win_pid(self): start = common.now_ms() if self.get_base_cassandra_version() >= 2.1: # Spin for up to 15s waiting for .bat to write the pid file pidfile = self.get_path() + "/cassandra.pid" while (not os.path.isfile(pidfile)): now = common.now_ms() if (now - start > 15000): raise Exception('Timed out waiting for pid file.') else: time.sleep(.001) # Spin for up to 10s waiting for .bat to fill the pid file start = common.now_ms() while (os.stat(pidfile).st_size == 0): now = common.now_ms() if (now - start > 10000): raise Exception('Timed out waiting for pid file to be filled.') else: time.sleep(.001) else: try: # Spin for 500ms waiting for .bat to write the dirty_pid file while (not os.path.isfile(self.get_path() + "/dirty_pid.tmp")): now = common.now_ms() if (now - start > 500): raise Exception('Timed out waiting for dirty_pid file.') else: time.sleep(.001) with open(self.get_path() + "/dirty_pid.tmp", 'r') as f: found = False process_regex = re.compile('ProcessId') readStart = common.now_ms() readEnd = common.now_ms() while (found is False and readEnd - readStart < 500): line = f.read() if (line): m = process_regex.search(line) if (m): found = True linesub = line.split('=') pidchunk = linesub[1].split(';') win_pid = pidchunk[0].lstrip() with open(self.get_path() + "/cassandra.pid", 'w') as pidfile: found = True pidfile.write(win_pid) else: time.sleep(.001) readEnd = common.now_ms() if not found: raise Exception('Node: %s Failed to find pid in ' + self.get_path() + '/dirty_pid.tmp. Manually kill it and check logs - ccm will be out of sync.') except Exception as e: common.error("Problem starting " + self.name + " (" + str(e) + ")") raise Exception('Error while parsing <node>/dirty_pid.tmp in path: ' + self.get_path()) def _delete_old_pid(self): pidfile = os.path.join(self.get_path(), 'cassandra.pid') if os.path.isfile(pidfile): os.remove(pidfile) def _update_pid(self, process): """ Reads pid from cassandra.pid file and stores in the self.pid After setting up pid updates status (UP, DOWN, etc) and node.conf """ pidfile = os.path.join(self.get_path(), 'cassandra.pid') start = time.time() while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0): if (time.time() - start > 30.0): common.error("Timed out waiting for pidfile to be filled (current time is {}, file exists {})".format(datetime.now(), os.path.isfile(pidfile))) break else: time.sleep(0.1) try: with open(pidfile, 'rb') as f: if common.is_modern_windows_install(self.get_base_cassandra_version()): self.pid = int(f.readline().strip().decode('utf-16').strip()) else: self.pid = int(f.readline().strip()) except IOError as e: raise NodeError('Problem starting node %s due to %s' % (self.name, e), process) self.__update_status() def __gather_sstables(self, datafiles=None, keyspace=None, columnfamilies=None): files = [] if keyspace is None: for k in self.list_keyspaces(): files = files + self.get_sstables(k, "") elif datafiles is None: if columnfamilies is None: files = files + self.get_sstables(keyspace, "") else: for cf in columnfamilies: files = files + self.get_sstables(keyspace, cf) else: if not columnfamilies or len(columnfamilies) > 1: raise common.ArgumentError("Exactly one column family must be specified with datafiles") for x in xrange(0, self.cluster.data_dir_count): cf_dir = os.path.join(os.path.realpath(self.get_path()), 'data{0}'.format(x), keyspace, columnfamilies[0]) sstables = set() for datafile in datafiles: if not os.path.isabs(datafile): datafile = os.path.join(os.getcwd(), datafile) if not datafile.startswith(cf_dir + '-') and not datafile.startswith(cf_dir + os.sep): raise NodeError("File doesn't appear to belong to the specified keyspace and column familily: " + datafile) sstable = _sstable_regexp.match(os.path.basename(datafile)) if not sstable: raise NodeError("File doesn't seem to be a valid sstable filename: " + datafile) sstable = sstable.groupdict() if not sstable['tmp'] and sstable['number'] not in sstables: if not os.path.exists(datafile): raise IOError("File doesn't exist: " + datafile) sstables.add(sstable['number']) files.append(datafile) return files def _clean_win_jmx(self): if self.get_base_cassandra_version() >= 2.1: sh_file = os.path.join(common.CASSANDRA_CONF_DIR, common.CASSANDRA_WIN_ENV) dst = os.path.join(self.get_path(), sh_file) common.replace_in_file(dst, "^\s+\$JMX_PORT=", " $JMX_PORT=\"" + self.jmx_port + "\"") # properly use single and double quotes to count for single quotes in the CASSANDRA_CONF path common.replace_in_file( dst, 'CASSANDRA_PARAMS=', ' $env:CASSANDRA_PARAMS=\'-Dcassandra' + # -Dcassandra ' -Dlogback.configurationFile=/"\' + "$env:CASSANDRA_CONF" + \'/logback.xml"\'' + # -Dlogback.configurationFile=/"$env:CASSANDRA_CONF/logback.xml" ' + \' -Dcassandra.config=file:"\' + "///$env:CASSANDRA_CONF" + \'/cassandra.yaml"\'') # -Dcassandra.config=file:"///$env:CASSANDRA_CONF/cassandra.yaml" def get_conf_option(self, option): conf_file = self.get_conf_file() with open(conf_file, 'r') as f: data = yaml.safe_load(f) if option in data: return data[option] else: return None def pause(self): try: import psutil p = psutil.Process(self.pid) p.suspend() except ImportError: if common.is_win(): common.warning("psutil not installed. Pause functionality will not work properly on Windows.") else: os.kill(self.pid, signal.SIGSTOP) def resume(self): try: import psutil p = psutil.Process(self.pid) p.resume() except ImportError: if common.is_win(): common.warning("psutil not installed. Resume functionality will not work properly on Windows.") else: os.kill(self.pid, signal.SIGCONT) def jstack_process(self, opts=None): opts = [] if opts is None else opts jstack_location = os.path.abspath(os.path.join(os.environ['JAVA_HOME'], 'bin', 'jstack')) jstack_cmd = [jstack_location, '-J-d64'] + opts + [str(self.pid)] return subprocess.Popen(jstack_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def jstack(self, opts=None): p = self.jstack_process(opts=opts) return handle_external_tool_process(p, ['jstack'] + opts) def byteman_submit_process(self, opts): cdir = self.get_install_dir() byteman_cmd = [] byteman_cmd.append(os.path.join(os.environ['JAVA_HOME'], 'bin', 'java')) byteman_cmd.append('-cp') byteman_cmd.append(glob.glob(os.path.join(cdir, 'build', 'lib', 'jars', 'byteman-submit-[0-9]*.jar'))[0]) byteman_cmd.append('org.jboss.byteman.agent.submit.Submit') byteman_cmd.append('-p') byteman_cmd.append(self.byteman_port) byteman_cmd += opts return subprocess.Popen(byteman_cmd) def byteman_submit(self, opts): p = self.byteman_submit_process(opts=opts) return handle_external_tool_process(p, ['byteman_submit'] + opts) def data_directories(self): return [os.path.join(self.get_path(), 'data{0}'.format(x)) for x in xrange(0, self.cluster.data_dir_count)] def get_sstable_data_files_process(self, ks, table): env = self.get_env() args = [self.get_tool('sstableutil'), '--type', 'final', ks, table] p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return p def get_sstable_data_files(self, ks, table): """ Read sstable data files by using sstableutil, so we ignore temporary files """ p = self.get_sstable_data_files_process(ks=ks, table=table) out, _, _ = handle_external_tool_process(p, ["sstableutil", '--type', 'final', ks, table]) return sorted(filter(lambda s: s.endswith('-Data.db'), out.splitlines())) def _get_load_from_info_output(info): load_lines = [s for s in info.split('\n') if s.startswith('Load')] if not len(load_lines) == 1: msg = ('Expected output from `nodetool info` to contain exactly 1 ' 'line starting with "Load". Found:\n') + info raise RuntimeError(msg) load_line = load_lines[0].split() # Don't have access to C* version here, so we need to support both prefix styles # See CASSANDRA-9692 on Apache JIRA unit_multipliers = {'KiB': 1, 'KB': 1, 'MiB': 1024, 'MB': 1024, 'GiB': 1024 * 1024, 'GB': 1024 * 1024, 'TiB': 1024 * 1024 * 1024, 'TB': 1024 * 1024 * 1024} load_num, load_units = load_line[2], load_line[3] try: load_mult = unit_multipliers[load_units] except KeyError: expected = ', '.join(list(unit_multipliers)) msg = ('Expected `nodetool info` to report load in one of the ' 'following units:\n' ' {expected}\n' 'Found:\n' ' {found}').format(expected=expected, found=load_units) raise RuntimeError(msg) return float(load_num) * load_mult def _grep_log_for_errors(log): except_re = re.compile(r'[Ee]xception|AssertionError') log_cat_re = re.compile(r'(\W|^)(INFO|DEBUG|WARN|ERROR)\W') def log_line_category(line): match = log_cat_re.search(line) return match.group(2) if match else None matches = [] loglines = log.splitlines() for line_num, line in enumerate(loglines): found_exception = False line_category = log_line_category(line) if line_category == 'ERROR': matches.append([line]) found_exception = True elif line_category == 'WARN': match = except_re.search(line) if match is not None: matches.append([line]) found_exception = True if found_exception: for next_line_num in range(line_num + 1, len(loglines)): next_line = loglines[next_line_num] # if a log line can't be identified, assume continuation of an ERROR/WARN exception if log_line_category(next_line) is None: matches[-1].append(next_line) else: break return matches def handle_external_tool_process(process, cmd_args): out, err = process.communicate() if (out is not None) and isinstance(out, bytes): out = out.decode() if (err is not None) and isinstance(err, bytes): err = err.decode() rc = process.returncode if rc != 0: raise ToolError(cmd_args, rc, out, err) ret = namedtuple('Subprocess_Return', 'stdout stderr rc') return ret(stdout=out, stderr=err, rc=rc) def get_install_dir_from_cluster_conf(node_path): file = os.path.join(os.path.dirname(node_path), "cluster.conf") with open(file) as f: for line in f: match = re.search('install_dir: (.*?)$', line) if match: return match.group(1) return None