ccmlib/dse/dse_node.py (467 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DataStax Enterprise (DSE) nodes
from __future__ import absolute_import, with_statement
import os
import re
import shutil
import signal
import subprocess
import yaml
from distutils.version import LooseVersion
from ccmlib import common, extension, node
from ccmlib.node import Node, handle_external_tool_process
class DseNode(Node):
"""
Provides interactions to a DSE node.
"""
@staticmethod
def get_version_from_build(install_dir=None, node_path=None, cassandra=False):
if install_dir is None and node_path is not None:
install_dir = node.get_install_dir_from_cluster_conf(node_path)
if install_dir is not None:
# Binary cassandra installs will have a 0.version.txt file
version_file = os.path.join(install_dir, '0.version.txt')
if os.path.exists(version_file):
with open(version_file) as f:
return LooseVersion(f.read().strip())
# For DSE look for a dse*.jar and extract the version number
dse_version = get_dse_version(install_dir)
if (dse_version is not None):
if cassandra:
from ccmlib.dse.dse_cluster import get_dse_cassandra_version
return get_dse_cassandra_version(install_dir)
else:
return LooseVersion(dse_version)
raise common.CCMError("Cannot find version")
def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None, derived_cassandra_version=None):
super(DseNode, self).__init__(name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface, byteman_port, environment_variables=environment_variables, derived_cassandra_version=derived_cassandra_version)
self._dse_config_options = {}
if self.cluster.hasOpscenter():
self._copy_agent()
def get_install_cassandra_root(self):
return os.path.join(self.get_install_dir(), 'resources', 'cassandra')
def get_node_cassandra_root(self):
return os.path.join(self.get_path(), 'resources', 'cassandra')
def get_conf_dir(self):
"""
Returns the path to the directory where Cassandra config are located
"""
return os.path.join(self.get_path(), 'resources', 'cassandra', 'conf')
def get_tool(self, toolname):
return common.join_bin(os.path.join(self.get_install_dir(), 'resources', 'cassandra'), 'bin', toolname)
def get_tool_args(self, toolname):
return [common.join_bin(os.path.join(self.get_install_dir(), 'resources', 'cassandra'), 'bin', 'dse'), toolname]
def get_env(self):
return self.make_dse_env(self.get_install_dir(), self.get_path(), self.ip_addr)
def node_setup(self, version, verbose):
from ccmlib.dse.dse_cluster import setup_dse
dir, v = setup_dse(version, self.cluster.dse_username, self.cluster.dse_password, verbose=verbose)
return dir
def address_for_version(self, version):
return "{}".format(str(self.address()));
def set_workloads(self, workloads):
self.workloads = workloads
self._update_config()
if 'solr' in self.workloads:
self.__generate_server_xml()
if 'graph' in self.workloads:
self.__update_gremlin_config_yaml()
if 'dsefs' in self.workloads:
dsefs_options = {'dsefs_options': {'enabled': True,
'work_dir': os.path.join(self.get_path(), 'dsefs'),
'data_directories': [{'dir': os.path.join(self.get_path(), 'dsefs', 'data')}]}}
self.set_dse_configuration_options(dsefs_options)
if 'spark' in self.workloads:
dsefs_enabled = 'dsefs' in self.workloads
dse_options = {'dsefs_options': {'enabled': dsefs_enabled,
'work_dir': os.path.join(self.get_path(), 'dsefs'),
'data_directories': [{'dir': os.path.join(self.get_path(), 'dsefs', 'data')}]}}
if self.cluster.version() >= '6.0':
# Don't overwrite aoss options
if 'resource_manager_options' not in self._dse_config_options:
dse_options['resource_manager_options'] = {'worker_options': {'memory_total': '1g', 'cores_total': 2}}
self.set_dse_configuration_options(dse_options)
self._update_spark_env()
def enable_aoss(self, thrift_port=10000, web_ui_port=9077):
self._dse_config_options['alwayson_sql_options'] = {'enabled': True, 'thrift_port': thrift_port, 'web_ui_port': web_ui_port}
self._dse_config_options['resource_manager_options'] = {'worker_options':
{'cores_total': 2,
'memory_total': '2g',
'workpools': [{
'name': 'alwayson_sql',
'cores': 0.5,
'memory': 0.5
}]}}
self._update_config()
self._update_spark_env()
self._update_yaml()
def set_dse_configuration_options(self, values=None):
if values is not None:
self._dse_config_options = common.merge_configuration(self._dse_config_options, values)
self.import_dse_config_files()
def watch_log_for_alive(self, nodes, from_mark=None, timeout=720, filename='system.log'):
"""
Watch the log of this node until it detects that the provided other
nodes are marked UP. This method works similarly to watch_log_for_death.
We want to provide a higher default timeout when this is called on DSE.
"""
super(DseNode, self).watch_log_for_alive(nodes, from_mark=from_mark, timeout=timeout, filename=filename)
def get_launch_bin(self):
cdir = self.get_install_dir()
launch_bin = common.join_bin(cdir, 'bin', 'dse')
# Copy back the dse scripts since profiling may have modified it the previous time
shutil.copy(launch_bin, self.get_bin_dir())
return common.join_bin(self.get_path(), 'bin', 'dse')
def add_custom_launch_arguments(self, args):
args.append('cassandra')
for workload in self.workloads:
if 'hadoop' in workload:
args.append('-t')
if 'solr' in workload:
args.append('-s')
if 'spark' in workload:
args.append('-k')
if 'cfs' in workload:
args.append('-c')
if 'graph' in workload:
args.append('-g')
def start(self,
join_ring=True,
no_wait=False,
verbose=False,
update_pid=True,
wait_other_notice=True,
replace_token=None,
replace_address=None,
jvm_args=None,
wait_for_binary_proto=False,
profile_options=None,
use_jna=False,
quiet_start=False,
allow_root=False,
set_migration_task=True,
jvm_version=None):
mark = self.mark_log()
process = super(DseNode, self).start(join_ring, no_wait, verbose, update_pid, wait_other_notice, replace_token,
replace_address, jvm_args, wait_for_binary_proto, profile_options, use_jna,
quiet_start, allow_root, set_migration_task, jvm_version)
if self.cluster.hasOpscenter():
self._start_agent()
def _start_agent(self):
agent_dir = os.path.join(self.get_path(), 'datastax-agent')
if os.path.exists(agent_dir):
self._write_agent_address_yaml(agent_dir)
self._write_agent_log4j_properties(agent_dir)
args = [os.path.join(agent_dir, 'bin', common.platform_binary('datastax-agent'))]
subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def stop(self, wait=True, wait_other_notice=False, signal_event=signal.SIGTERM, **kwargs):
if self.cluster.hasOpscenter():
self._stop_agent()
return super(DseNode, self).stop(wait=wait, wait_other_notice=wait_other_notice, signal_event=signal_event, **kwargs)
def _stop_agent(self):
agent_dir = os.path.join(self.get_path(), 'datastax-agent')
if os.path.exists(agent_dir):
pidfile = os.path.join(agent_dir, 'datastax-agent.pid')
if os.path.exists(pidfile):
with open(pidfile, 'r') as f:
pid = int(f.readline().strip())
f.close()
if pid is not None:
try:
os.kill(pid, signal.SIGKILL)
except OSError:
pass
os.remove(pidfile)
def nodetool(self, cmd, username=None, password=None, capture_output=True, wait=True):
if password is not None:
cmd = '-pw {} '.format(password) + cmd
if username is not None:
cmd = '-u {} '.format(username) + cmd
return super(DseNode, self).nodetool(cmd)
def dsetool(self, cmd):
env = self.get_env()
extension.append_to_client_env(self, env)
node_ip, binary_port = self.network_interfaces['binary']
dsetool = common.join_bin(self.get_install_dir(), 'bin', 'dsetool')
args = [dsetool, '-h', node_ip, '-j', str(self.jmx_port), '-c', str(binary_port)]
args += cmd.split()
p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return handle_external_tool_process(p, args)
def dse(self, dse_options=None):
if dse_options is None:
dse_options = []
env = self.get_env()
extension.append_to_client_env(self, env)
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse]
args += dse_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def hadoop(self, hadoop_options=None):
if hadoop_options is None:
hadoop_options = []
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'hadoop']
args += hadoop_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def hive(self, hive_options=None):
if hive_options is None:
hive_options = []
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'hive']
args += hive_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def pig(self, pig_options=None):
if pig_options is None:
pig_options = []
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'pig']
args += pig_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def sqoop(self, sqoop_options=None):
if sqoop_options is None:
sqoop_options = []
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'sqoop']
args += sqoop_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def spark(self, spark_options=None):
if spark_options is None:
spark_options = []
env = self.get_env()
env['JMX_PORT'] = self.jmx_port
dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
args = [dse, 'spark']
args += spark_options
p = subprocess.Popen(args, env=env) #Don't redirect stdout/stderr, users need to interact with new process
return handle_external_tool_process(p, args)
def import_dse_config_files(self):
self._update_config()
if not os.path.isdir(os.path.join(self.get_path(), 'resources', 'dse', 'conf')):
os.makedirs(os.path.join(self.get_path(), 'resources', 'dse', 'conf'))
common.copy_directory(os.path.join(self.get_install_dir(), 'resources', 'dse', 'conf'), os.path.join(self.get_path(), 'resources', 'dse', 'conf'))
self._update_yaml()
def copy_config_files(self):
for product in ['dse', 'cassandra', 'hadoop', 'hadoop2-client', 'sqoop', 'hive', 'tomcat', 'spark', 'shark', 'mahout', 'pig', 'solr', 'graph']:
src_conf = os.path.join(self.get_install_dir(), 'resources', product, 'conf')
dst_conf = os.path.join(self.get_path(), 'resources', product, 'conf')
if not os.path.isdir(src_conf):
continue
if os.path.isdir(dst_conf):
common.rmdirs(dst_conf)
shutil.copytree(src_conf, dst_conf)
if product == 'solr':
src_web = os.path.join(self.get_install_dir(), 'resources', product, 'web')
dst_web = os.path.join(self.get_path(), 'resources', product, 'web')
if os.path.isdir(dst_web):
common.rmdirs(dst_web)
shutil.copytree(src_web, dst_web)
if product == 'tomcat':
src_lib = os.path.join(self.get_install_dir(), 'resources', product, 'lib')
dst_lib = os.path.join(self.get_path(), 'resources', product, 'lib')
if os.path.isdir(dst_lib):
common.rmdirs(dst_lib)
if os.path.exists(src_lib):
shutil.copytree(src_lib, dst_lib)
src_webapps = os.path.join(self.get_install_dir(), 'resources', product, 'webapps')
dst_webapps = os.path.join(self.get_path(), 'resources', product, 'webapps')
if os.path.isdir(dst_webapps):
common.rmdirs(dst_webapps)
shutil.copytree(src_webapps, dst_webapps)
src_lib = os.path.join(self.get_install_dir(), 'resources', product, 'gremlin-console', 'conf')
dst_lib = os.path.join(self.get_path(), 'resources', product, 'gremlin-console', 'conf')
if os.path.isdir(dst_lib):
common.rmdirs(dst_lib)
if os.path.exists(src_lib):
shutil.copytree(src_lib, dst_lib)
def import_bin_files(self):
common.copy_directory(os.path.join(self.get_install_dir(), 'bin'), self.get_bin_dir())
cassandra_bin_dir = os.path.join(self.get_path(), 'resources', 'cassandra', 'bin')
shutil.rmtree(cassandra_bin_dir, ignore_errors=True)
os.makedirs(cassandra_bin_dir)
common.copy_directory(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'bin'), cassandra_bin_dir)
if os.path.exists(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'tools')):
cassandra_tools_dir = os.path.join(self.get_path(), 'resources', 'cassandra', 'tools')
shutil.rmtree(cassandra_tools_dir, ignore_errors=True)
shutil.copytree(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'tools'), cassandra_tools_dir)
self.export_dse_home_in_dse_env_sh()
def export_dse_home_in_dse_env_sh(self):
'''
Due to the way CCM lays out files, separating the repository
from the node(s) confs, the `dse-env.sh` script of each node
needs to have its DSE_HOME var set and exported. Since DSE
4.5.x, the stock `dse-env.sh` file includes a commented-out
place to do exactly this, intended for installers.
Basically: read in the file, write it back out and add the two
lines.
'sstableloader' is an example of a node script that depends on
this, when used in a CCM-built cluster.
'''
with open(self.get_bin_dir() + "/dse-env.sh", "r") as dse_env_sh:
buf = dse_env_sh.readlines()
with open(self.get_bin_dir() + "/dse-env.sh", "w") as out_file:
for line in buf:
out_file.write(line)
if line == "# This is here so the installer can force set DSE_HOME\n":
out_file.write("DSE_HOME=" + self.get_install_dir() + "\nexport DSE_HOME\n")
def _update_log4j(self):
super(DseNode, self)._update_log4j()
conf_file = os.path.join(self.get_conf_dir(), common.LOG4J_CONF)
append_pattern = 'log4j.appender.V.File='
log_file = os.path.join(self.get_path(), 'logs', 'solrvalidation.log')
if common.is_win():
log_file = re.sub("\\\\", "/", log_file)
common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)
append_pattern = 'log4j.appender.A.File='
log_file = os.path.join(self.get_path(), 'logs', 'audit.log')
if common.is_win():
log_file = re.sub("\\\\", "/", log_file)
common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)
append_pattern = 'log4j.appender.B.File='
log_file = os.path.join(self.get_path(), 'logs', 'audit', 'dropped-events.log')
if common.is_win():
log_file = re.sub("\\\\", "/", log_file)
common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)
def _update_yaml(self):
super(DseNode, self)._update_yaml()
conf_file = os.path.join(self.get_path(), 'resources', 'dse', 'conf', 'dse.yaml')
with open(conf_file, 'r') as f:
data = yaml.safe_load(f)
data['system_key_directory'] = os.path.join(self.get_path(), 'keys')
# Get a map of combined cluster and node configuration with the node
# configuration taking precedence.
full_options = common.merge_configuration(
self.cluster._dse_config_options,
self._dse_config_options, delete_empty=False)
# Merge options with original yaml data.
data = common.merge_configuration(data, full_options)
with open(conf_file, 'w') as f:
yaml.safe_dump(data, f, default_flow_style=False)
def __generate_server_xml(self):
server_xml = os.path.join(self.get_path(), 'resources', 'tomcat', 'conf', 'server.xml')
if os.path.isfile(server_xml):
os.remove(server_xml)
with open(server_xml, 'w+') as f:
f.write('<Server port="8005" shutdown="SHUTDOWN">\n')
f.write(' <Service name="Solr">\n')
f.write(' <Connector port="8983" address="%s" protocol="HTTP/1.1" connectionTimeout="20000" maxThreads = "200" URIEncoding="UTF-8"/>\n' % self.ip_addr)
f.write(' <Engine name="Solr" defaultHost="localhost">\n')
f.write(' <Host name="localhost" appBase="../solr/web"\n')
f.write(' unpackWARs="true" autoDeploy="true"\n')
f.write(' xmlValidation="false" xmlNamespaceAware="false">\n')
f.write(' </Host>\n')
f.write(' </Engine>\n')
f.write(' </Service>\n')
f.write('</Server>\n')
f.close()
def __update_gremlin_config_yaml(self):
conf_file = os.path.join(self.get_path(), 'resources', 'graph', 'gremlin-console', 'conf', 'remote.yaml')
with open(conf_file, 'r') as f:
data = yaml.safe_load(f)
data['hosts'] = [self.ip_addr]
with open(conf_file, 'w') as f:
yaml.safe_dump(data, f, default_flow_style=False)
def _get_directories(self):
dirs = []
for i in ['data', 'commitlogs', 'saved_caches', 'logs', 'bin', 'keys', 'resources', os.path.join('data', 'hints')]:
dirs.append(os.path.join(self.get_path(), i))
return dirs
def _copy_agent(self):
agent_source = os.path.join(self.get_install_dir(), 'datastax-agent')
agent_target = os.path.join(self.get_path(), 'datastax-agent')
if os.path.exists(agent_source) and not os.path.exists(agent_target):
shutil.copytree(agent_source, agent_target)
def _write_agent_address_yaml(self, agent_dir):
address_yaml = os.path.join(agent_dir, 'conf', 'address.yaml')
if not os.path.exists(address_yaml):
with open(address_yaml, 'w+') as f:
ip = self.ip_addr
jmx = self.jmx_port
f.write('stomp_interface: 127.0.0.1\n')
f.write('local_interface: %s\n' % ip)
f.write('agent_rpc_interface: %s\n' % ip)
f.write('agent_rpc_broadcast_address: %s\n' % ip)
f.write('cassandra_conf: %s\n' % os.path.join(self.get_path(), 'resources', 'cassandra', 'conf', 'cassandra.yaml'))
f.write('cassandra_install: %s\n' % self.get_path())
f.write('cassandra_logs: %s\n' % os.path.join(self.get_path(), 'logs'))
if 'thrift' in self.network_interfaces:
(_, port) = self.network_interfaces['thrift']
f.write('thrift_port: %s\n' % port)
f.write('jmx_port: %s\n' % jmx)
f.close()
def _write_agent_log4j_properties(self, agent_dir):
log4j_properties = os.path.join(agent_dir, 'conf', 'log4j.properties')
with open(log4j_properties, 'w+') as f:
f.write('log4j.rootLogger=INFO,R\n')
f.write('log4j.logger.org.apache.http=OFF\n')
f.write('log4j.logger.org.eclipse.jetty.util.log=WARN,R\n')
f.write('log4j.appender.R=org.apache.log4j.RollingFileAppender\n')
f.write('log4j.appender.R.maxFileSize=20MB\n')
f.write('log4j.appender.R.maxBackupIndex=5\n')
f.write('log4j.appender.R.layout=org.apache.log4j.PatternLayout\n')
f.write('log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %m%n\n')
f.write('log4j.appender.R.File=./log/agent.log\n')
f.close()
def _update_spark_env(self):
try:
node_num = re.search(u'node(\d+)', self.name).group(1)
except AttributeError:
node_num = 0
conf_file = os.path.join(self.get_path(), 'resources', 'spark', 'conf', 'spark-env.sh')
env = self.get_env()
content = []
with open(conf_file, 'r') as f:
for line in f.readlines():
for spark_var in env.keys():
if line.startswith('export %s=' % spark_var) or line.startswith('export %s=' % spark_var, 2):
line = 'export %s=%s\n' % (spark_var, env[spark_var])
break
content.append(line)
with open(conf_file, 'w') as f:
f.writelines(content)
# set unique spark.shuffle.service.port for each node; this is only needed for DSE 5.0.x;
# starting in 5.1 this setting is no longer needed
if self.cluster.version() > '5.0' and self.cluster.version() < '5.1':
defaults_file = os.path.join(self.get_path(), 'resources', 'spark', 'conf', 'spark-defaults.conf')
with open(defaults_file, 'a') as f:
port_num = 7737 + int(node_num)
f.write("\nspark.shuffle.service.port %s\n" % port_num)
# create Spark working dirs; starting with DSE 5.0.10/5.1.3 these are no longer automatically created
for e in ["SPARK_WORKER_DIR", "SPARK_LOCAL_DIRS", "SPARK_EXECUTOR_DIRS"]:
dir = env[e]
if not os.path.exists(dir):
os.makedirs(dir)
def make_dse_env(self, install_dir, node_path, node_ip):
version = DseNode.get_version_from_build(node_path=node_path)
env = os.environ.copy()
env['MAX_HEAP_SIZE'] = os.environ.get('CCM_MAX_HEAP_SIZE', '500M')
env['HEAP_NEWSIZE'] = os.environ.get('CCM_HEAP_NEWSIZE', '50M')
if version < '6.0':
env['SPARK_WORKER_MEMORY'] = os.environ.get('SPARK_WORKER_MEMORY', '1024M')
env['SPARK_WORKER_CORES'] = os.environ.get('SPARK_WORKER_CORES', '2')
else:
env['ALWAYSON_SQL_LOG_DIR'] = os.path.join(node_path, 'logs')
env['DSE_HOME'] = os.path.join(install_dir)
env['DSE_CONF'] = os.path.join(node_path, 'resources', 'dse', 'conf')
env['CASSANDRA_HOME'] = os.path.join(install_dir, 'resources', 'cassandra')
env['CASSANDRA_CONF'] = os.path.join(node_path, 'resources', 'cassandra', 'conf')
env['HIVE_CONF_DIR'] = os.path.join(node_path, 'resources', 'hive', 'conf')
env['SQOOP_CONF_DIR'] = os.path.join(node_path, 'resources', 'sqoop', 'conf')
env['TOMCAT_HOME'] = os.path.join(node_path, 'resources', 'tomcat')
env['TOMCAT_CONF_DIR'] = os.path.join(node_path, 'resources', 'tomcat', 'conf')
env['PIG_CONF_DIR'] = os.path.join(node_path, 'resources', 'pig', 'conf')
env['MAHOUT_CONF_DIR'] = os.path.join(node_path, 'resources', 'mahout', 'conf')
env['SPARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'spark', 'conf')
env['SHARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'shark', 'conf')
env['GREMLIN_CONSOLE_CONF_DIR'] = os.path.join(node_path, 'resources', 'graph', 'gremlin-console', 'conf')
env['SPARK_WORKER_DIR'] = os.path.join(node_path, 'spark', 'worker')
env['SPARK_LOCAL_DIRS'] = os.path.join(node_path, 'spark', '.local')
env['SPARK_EXECUTOR_DIRS'] = os.path.join(node_path, 'spark', 'rdd')
env['SPARK_WORKER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'worker')
env['SPARK_MASTER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'master')
env['DSE_LOG_ROOT'] = os.path.join(node_path, 'logs', 'dse')
env['CASSANDRA_LOG_DIR'] = os.path.join(node_path, 'logs')
env['SPARK_LOCAL_IP'] = '' + node_ip
if version >= '5.0':
env['HADOOP1_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop', 'conf')
env['HADOOP2_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop2-client', 'conf')
else:
env['HADOOP_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop', 'conf')
return env
def get_dse_version(install_dir):
""" look for a dse*.jar and extract the version number """
for root, dirs, files in os.walk(install_dir):
for file in files:
match = re.search('^dse(?:-core)?-([0-9.]+)(?:-.*)?\.jar', file)
if match:
return match.group(1)
return None