ccmlib/dse/dse_cluster.py (244 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DataStax Enterprise (DSE) clusters
from __future__ import absolute_import
import os
import re
import shutil
import signal
import subprocess
import tarfile
import tempfile
from argparse import ArgumentError
from distutils.version import LooseVersion
from six.moves import urllib
from ccmlib import common, repository
from ccmlib.cluster import Cluster
from ccmlib.common import rmdirs
from ccmlib.common import ArgumentError
from ccmlib.dse.dse_node import DseNode
try:
import ConfigParser
except ImportError:
import configparser as ConfigParser
DSE_CASSANDRA_CONF_DIR = "resources/cassandra/conf"
OPSCENTER_CONF_DIR = "conf"
DSE_ARCHIVE = "https://downloads.datastax.com/enterprise/dse-%s-bin.tar.gz"
OPSC_ARCHIVE = "https://downloads.datastax.com/enterprise/opscenter-%s.tar.gz"
def isDse(install_dir, options=None):
if install_dir is None:
raise ArgumentError('Undefined installation directory')
bin_dir = os.path.join(install_dir, common.BIN_DIR)
if options and options.dse and './' != install_dir and not os.path.exists(bin_dir):
raise ArgumentError('Installation directory does not contain a bin directory: %s' % install_dir)
if options and options.dse:
return True
dse_script = os.path.join(bin_dir, 'dse')
if options and not options.dse and './' != install_dir and os.path.exists(dse_script):
raise ArgumentError('Installation directory is DSE but options did not specify `--dse`: %s' % install_dir)
return os.path.exists(dse_script)
def isOpscenter(install_dir, options=None):
if install_dir is None:
raise ArgumentError('Undefined installation directory')
bin_dir = os.path.join(install_dir, common.BIN_DIR)
if options and options.dse and './' != install_dir and not os.path.exists(bin_dir):
raise ArgumentError('Installation directory does not contain a bin directory')
opscenter_script = os.path.join(bin_dir, 'opscenter')
return os.path.exists(opscenter_script)
def isDseClusterType(install_dir, options=None):
if isDse(install_dir, options) or isOpscenter(install_dir, options):
return DseCluster
return None
class DseCluster(Cluster):
@staticmethod
def getConfDir(install_dir):
if isDse(install_dir):
return os.path.join(install_dir, DSE_CASSANDRA_CONF_DIR)
elif isOpscenter(install_dir):
return os.path.join(os.path.join(install_dir, OPSCENTER_CONF_DIR), common.CASSANDRA_CONF)
raise RuntimeError("illegal call to DseCluster.getConfDir() when not dse or opscenter")
@staticmethod
def getNodeClass():
return DseNode
def __init__(self, path, name, partitioner=None, install_dir=None, create_directory=True, version=None, verbose=False, derived_cassandra_version=None, options=None):
self.load_credentials_from_file(options.dse_credentials_file if options else None)
self.dse_username = options.dse_username if options else None
self.dse_password = options.dse_password if options else None
self.opscenter = options.opscenter if options else None
self._cassandra_version = None
self._cassandra_version = derived_cassandra_version
super(DseCluster, self).__init__(path, name, partitioner, install_dir, create_directory, version, verbose, options=options)
def load_from_repository(self, version, verbose):
if self.opscenter is not None:
odir = setup_opscenter(self.opscenter, self.dse_username, self.dse_password, verbose)
target_dir = os.path.join(self.get_path(), 'opscenter')
shutil.copytree(odir, target_dir)
return setup_dse(version, self.dse_username, self.dse_password, verbose)
def load_credentials_from_file(self, dse_credentials_file):
# Use .dse.ini if it exists in the default .ccm directory.
if dse_credentials_file is None:
creds_file = os.path.join(common.get_default_path(), '.dse.ini')
if os.path.isfile(creds_file):
dse_credentials_file = creds_file
if dse_credentials_file is not None:
parser = ConfigParser.RawConfigParser()
parser.read(dse_credentials_file)
if parser.has_section('dse_credentials'):
if parser.has_option('dse_credentials', 'dse_username'):
self.dse_username = parser.get('dse_credentials', 'dse_username')
if parser.has_option('dse_credentials', 'dse_password'):
self.dse_password = parser.get('dse_credentials', 'dse_password')
else:
common.warning("{} does not contain a 'dse_credentials' section.".format(dse_credentials_file))
def get_seeds(self):
return [s.network_interfaces['storage'][0] if isinstance(s, DseNode) else s for s in self.seeds]
def hasOpscenter(self):
return os.path.exists(os.path.join(self.get_path(), 'opscenter'))
def create_node(self, name, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None,derived_cassandra_version=None):
return DseNode(name, self, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface, byteman_port, environment_variables=environment_variables, derived_cassandra_version=derived_cassandra_version)
def can_generate_tokens(self):
return False
def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_other_notice=True, jvm_args=None, profile_options=None, quiet_start=False, allow_root=False, jvm_version=None):
if jvm_args is None:
jvm_args = []
marks = {}
for node in self.nodelist():
marks[node] = node.mark_log()
started = super(DseCluster, self).start(no_wait, verbose, wait_for_binary_proto, wait_other_notice, jvm_args, profile_options, quiet_start=quiet_start, allow_root=allow_root, timeout=180, jvm_version=jvm_version)
self.start_opscenter()
if self._misc_config_options.get('enable_aoss', False):
self.wait_for_any_log('AlwaysOn SQL started', 600, marks=marks)
return started
def stop(self, wait=True, signal_event=signal.SIGTERM, **kwargs):
not_running = super(DseCluster, self).stop(wait=wait, signal_event=signal.SIGTERM, **kwargs)
self.stop_opscenter()
return not_running
def remove(self, node=None):
# We _must_ gracefully stop if aoss is enabled, otherwise we will leak the spark workers
super(DseCluster, self).remove(node=node, gently=self._misc_config_options.get('enable_aoss', False))
def cassandra_version(self):
if self._cassandra_version is None:
self._cassandra_version = get_dse_cassandra_version(self.get_install_dir())
return self._cassandra_version
def enable_aoss(self):
if self.version() < '6.0':
common.error("Cannot enable AOSS in DSE clusters before 6.0")
exit(1)
self._misc_config_options['enable_aoss'] = True
for node in self.nodelist():
port_offset = int(node.name[4:])
node.enable_aoss(thrift_port=10000 + port_offset, web_ui_port=9077 + port_offset)
self._update_config()
def set_dse_configuration_options(self, values=None):
if values is not None:
self._dse_config_options = common.merge_configuration(self._dse_config_options, values)
self._update_config()
for node in list(self.nodes.values()):
node.import_dse_config_files()
return self
def start_opscenter(self):
if self.hasOpscenter():
self.write_opscenter_cluster_config()
args = [os.path.join(self.get_path(), 'opscenter', 'bin', common.platform_binary('opscenter'))]
subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def stop_opscenter(self):
pidfile = os.path.join(self.get_path(), 'opscenter', 'twistd.pid')
if os.path.exists(pidfile):
with open(pidfile, 'r') as f:
pid = int(f.readline().strip())
f.close()
if pid is not None:
try:
os.kill(pid, signal.SIGKILL)
except OSError:
pass
os.remove(pidfile)
def write_opscenter_cluster_config(self):
cluster_conf = os.path.join(self.get_path(), 'opscenter', 'conf', 'clusters')
if not os.path.exists(cluster_conf):
os.makedirs(cluster_conf)
if len(self.nodes) > 0:
node = list(self.nodes.values())[0]
(node_ip, node_port) = node.network_interfaces['thrift']
node_jmx = node.jmx_port
with open(os.path.join(cluster_conf, self.name + '.conf'), 'w+') as f:
f.write('[jmx]\n')
f.write('port = %s\n' % node_jmx)
f.write('[cassandra]\n')
f.write('seed_hosts = %s\n' % node_ip)
f.write('api_port = %s\n' % node_port)
f.close()
def setup_dse(version, username, password, verbose=False):
(cdir, version, fallback) = repository.__setup(version, verbose)
if cdir:
return (cdir, version)
cdir = repository.version_directory(version)
if cdir is None:
download_dse_version(version, username, password, verbose=verbose)
cdir = repository.version_directory(version)
return (cdir, version)
def setup_opscenter(opscenter, username, password, verbose=False):
ops_version = 'opsc' + opscenter
odir = repository.version_directory(ops_version)
if odir is None:
download_opscenter_version(opscenter, username, password, ops_version, verbose=verbose)
odir = repository.version_directory(ops_version)
return odir
def get_dse_cassandra_version(install_dir):
# for this to work, the current JAVA_HOME must already be appropriate
dse_cmd = os.path.join(install_dir, 'bin', 'dse')
(output, stderr) = subprocess.Popen([dse_cmd, "cassandra", '-v'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
# just take the last line to avoid any possible log lines
output = output.decode('utf-8').rstrip().split('\n')[-1]
match = re.search('^([0-9.]+)(?:-.*)?', str(output))
if match:
return LooseVersion(match.group(1))
raise ArgumentError("Unable to determine Cassandra version using `bin/dse cassandra -v` from output: %s.\n\tstdout: '%s'\n\tstderr: '%s'"
% (install_dir, output, stderr))
def download_dse_version(version, username, password, verbose=False):
url = DSE_ARCHIVE
if repository.CCM_CONFIG.has_option('repositories', 'dse'):
url = repository.CCM_CONFIG.get('repositories', 'dse')
url = url % version
_, target = tempfile.mkstemp(suffix=".tar.gz", prefix="ccm-")
try:
if username is None:
common.warning("No dse username detected, specify one using --dse-username or passing in a credentials file using --dse-credentials.")
if password is None:
common.warning("No dse password detected, specify one using --dse-password or passing in a credentials file using --dse-credentials.")
repository.__download(url, target, username=username, password=password, show_progress=verbose)
common.debug("Extracting {} as version {} ...".format(target, version))
tar = tarfile.open(target)
dir = tar.next().name.split("/")[0] # pylint: disable=all
tar.extractall(path=repository.__get_dir())
tar.close()
target_dir = os.path.join(repository.__get_dir(), version)
if os.path.exists(target_dir):
rmdirs(target_dir)
shutil.move(os.path.join(repository.__get_dir(), dir), target_dir)
except urllib.error.URLError as e:
msg = "Invalid version %s" % version if url is None else "Invalid url %s" % url
msg = msg + " (underlying error is: %s)" % str(e)
raise ArgumentError(msg)
except tarfile.ReadError as e:
raise ArgumentError("Unable to uncompress downloaded file: %s" % str(e))
def download_opscenter_version(version, username, password, target_version, verbose=False):
url = OPSC_ARCHIVE
if repository.CCM_CONFIG.has_option('repositories', 'opscenter'):
url = repository.CCM_CONFIG.get('repositories', 'opscenter')
url = url % version
_, target = tempfile.mkstemp(suffix=".tar.gz", prefix="ccm-")
try:
if username is None:
common.warning("No dse username detected, specify one using --dse-username or passing in a credentials file using --dse-credentials.")
if password is None:
common.warning("No dse password detected, specify one using --dse-password or passing in a credentials file using --dse-credentials.")
repository.__download(url, target, username=username, password=password, show_progress=verbose)
common.info("Extracting {} as version {} ...".format(target, target_version))
tar = tarfile.open(target)
dir = tar.next().name.split("/")[0] # pylint: disable=all
tar.extractall(path=repository.__get_dir())
tar.close()
target_dir = os.path.join(repository.__get_dir(), target_version)
if os.path.exists(target_dir):
rmdirs(target_dir)
shutil.move(os.path.join(repository.__get_dir(), dir), target_dir)
except urllib.error.URLError as e:
msg = "Invalid version {}".format(version) if url is None else "Invalid url {}".format(url)
msg = msg + " (underlying error is: {})".format(str(e))
raise ArgumentError(msg)
except tarfile.ReadError as e:
raise ArgumentError("Unable to uncompress downloaded file: {}".format(str(e)))