# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# DataStax Enterprise (DSE) nodes

from __future__ import absolute_import, with_statement

import os
import re
import shutil
import signal
import subprocess

import yaml
from distutils.version import LooseVersion

from ccmlib import common, extension, node
from ccmlib.node import Node, handle_external_tool_process


class DseNode(Node):

    """
    Provides interactions to a DSE node.
    """

    @staticmethod
    def get_version_from_build(install_dir=None, node_path=None, cassandra=False):
        if install_dir is None and node_path is not None:
            install_dir = node.get_install_dir_from_cluster_conf(node_path)
        if install_dir is not None:
            # Binary cassandra installs will have a 0.version.txt file
            version_file = os.path.join(install_dir, '0.version.txt')
            if os.path.exists(version_file):
                with open(version_file) as f:
                    return LooseVersion(f.read().strip())
            # For DSE look for a dse*.jar and extract the version number
            dse_version = get_dse_version(install_dir)
            if (dse_version is not None):
                if cassandra:
                    from ccmlib.dse.dse_cluster import get_dse_cassandra_version
                    return get_dse_cassandra_version(install_dir)
                else:
                    return LooseVersion(dse_version)
        raise common.CCMError("Cannot find version")


    def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, byteman_port='0', environment_variables=None, derived_cassandra_version=None):
        super(DseNode, self).__init__(name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface, byteman_port, environment_variables=environment_variables, derived_cassandra_version=derived_cassandra_version)
       
        self._dse_config_options = {}
        if self.cluster.hasOpscenter():
            self._copy_agent()

    def get_install_cassandra_root(self):
        return os.path.join(self.get_install_dir(), 'resources', 'cassandra')

    def get_node_cassandra_root(self):
        return os.path.join(self.get_path(), 'resources', 'cassandra')

    def get_conf_dir(self):
        """
        Returns the path to the directory where Cassandra config are located
        """
        return os.path.join(self.get_path(), 'resources', 'cassandra', 'conf')

    def get_tool(self, toolname):
        return common.join_bin(os.path.join(self.get_install_dir(), 'resources', 'cassandra'), 'bin', toolname)

    def get_tool_args(self, toolname):
        return [common.join_bin(os.path.join(self.get_install_dir(), 'resources', 'cassandra'), 'bin', 'dse'), toolname]

    def get_env(self):
        return self.make_dse_env(self.get_install_dir(), self.get_path(), self.ip_addr)

    def node_setup(self, version, verbose):
        from ccmlib.dse.dse_cluster import setup_dse
        dir, v = setup_dse(version, self.cluster.dse_username, self.cluster.dse_password, verbose=verbose)
        return dir

    def address_for_version(self, version):
        return "{}".format(str(self.address()));

    def set_workloads(self, workloads):
        self.workloads = workloads
        self._update_config()
        if 'solr' in self.workloads:
            self.__generate_server_xml()
        if 'graph' in self.workloads:
            self.__update_gremlin_config_yaml()
        if 'dsefs' in self.workloads:
            dsefs_options = {'dsefs_options': {'enabled': True,
                                               'work_dir': os.path.join(self.get_path(), 'dsefs'),
                                               'data_directories': [{'dir': os.path.join(self.get_path(), 'dsefs', 'data')}]}}
            self.set_dse_configuration_options(dsefs_options)
        if 'spark' in self.workloads:
            dsefs_enabled = 'dsefs' in self.workloads
            dse_options = {'dsefs_options': {'enabled': dsefs_enabled,
                                             'work_dir': os.path.join(self.get_path(), 'dsefs'),
                                             'data_directories': [{'dir': os.path.join(self.get_path(), 'dsefs', 'data')}]}}
            if self.cluster.version() >= '6.0':
                # Don't overwrite aoss options
                if 'resource_manager_options' not in self._dse_config_options:
                    dse_options['resource_manager_options'] = {'worker_options': {'memory_total': '1g', 'cores_total': 2}}

            self.set_dse_configuration_options(dse_options)
            self._update_spark_env()

    def enable_aoss(self, thrift_port=10000, web_ui_port=9077):
        self._dse_config_options['alwayson_sql_options'] = {'enabled': True, 'thrift_port': thrift_port, 'web_ui_port': web_ui_port}
        self._dse_config_options['resource_manager_options'] = {'worker_options':
                                                                {'cores_total': 2,
                                                                 'memory_total': '2g',
                                                                 'workpools': [{
                                                                     'name': 'alwayson_sql',
                                                                     'cores': 0.5,
                                                                     'memory': 0.5
                                                                 }]}}
        self._update_config()
        self._update_spark_env()
        self._update_yaml()

    def set_dse_configuration_options(self, values=None):
        if values is not None:
            self._dse_config_options = common.merge_configuration(self._dse_config_options, values)
        self.import_dse_config_files()

    def watch_log_for_alive(self, nodes, from_mark=None, timeout=720, filename='system.log'):
        """
        Watch the log of this node until it detects that the provided other
        nodes are marked UP. This method works similarly to watch_log_for_death.

        We want to provide a higher default timeout when this is called on DSE.
        """
        super(DseNode, self).watch_log_for_alive(nodes, from_mark=from_mark, timeout=timeout, filename=filename)

    def get_launch_bin(self):
        cdir = self.get_install_dir()
        launch_bin = common.join_bin(cdir, 'bin', 'dse')
        # Copy back the dse scripts since profiling may have modified it the previous time
        shutil.copy(launch_bin, self.get_bin_dir())
        return common.join_bin(self.get_path(), 'bin', 'dse')

    def add_custom_launch_arguments(self, args):
        args.append('cassandra')
        for workload in self.workloads:
            if 'hadoop' in workload:
                args.append('-t')
            if 'solr' in workload:
                args.append('-s')
            if 'spark' in workload:
                args.append('-k')
            if 'cfs' in workload:
                args.append('-c')
            if 'graph' in workload:
                args.append('-g')

    def start(self,
              join_ring=True,
              no_wait=False,
              verbose=False,
              update_pid=True,
              wait_other_notice=True,
              replace_token=None,
              replace_address=None,
              jvm_args=None,
              wait_for_binary_proto=False,
              profile_options=None,
              use_jna=False,
              quiet_start=False,
              allow_root=False,
              set_migration_task=True,
              jvm_version=None):
        mark = self.mark_log()
        process = super(DseNode, self).start(join_ring, no_wait, verbose, update_pid, wait_other_notice, replace_token,
                                             replace_address, jvm_args, wait_for_binary_proto, profile_options, use_jna,
                                             quiet_start, allow_root, set_migration_task, jvm_version)
        if self.cluster.hasOpscenter():
            self._start_agent()

    def _start_agent(self):
        agent_dir = os.path.join(self.get_path(), 'datastax-agent')
        if os.path.exists(agent_dir):
            self._write_agent_address_yaml(agent_dir)
            self._write_agent_log4j_properties(agent_dir)
            args = [os.path.join(agent_dir, 'bin', common.platform_binary('datastax-agent'))]
            subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    def stop(self, wait=True, wait_other_notice=False, signal_event=signal.SIGTERM, **kwargs):
        if self.cluster.hasOpscenter():
            self._stop_agent()
        return super(DseNode, self).stop(wait=wait, wait_other_notice=wait_other_notice, signal_event=signal_event, **kwargs)

    def _stop_agent(self):
        agent_dir = os.path.join(self.get_path(), 'datastax-agent')
        if os.path.exists(agent_dir):
            pidfile = os.path.join(agent_dir, 'datastax-agent.pid')
            if os.path.exists(pidfile):
                with open(pidfile, 'r') as f:
                    pid = int(f.readline().strip())
                    f.close()
                if pid is not None:
                    try:
                        os.kill(pid, signal.SIGKILL)
                    except OSError:
                        pass
                os.remove(pidfile)

    def nodetool(self, cmd, username=None, password=None, capture_output=True, wait=True):
        if password is not None:
            cmd = '-pw {} '.format(password) + cmd
        if username is not None:
            cmd = '-u {} '.format(username) + cmd

        return super(DseNode, self).nodetool(cmd)

    def dsetool(self, cmd):
        env = self.get_env()
        extension.append_to_client_env(self, env)
        node_ip, binary_port = self.network_interfaces['binary']
        dsetool = common.join_bin(self.get_install_dir(), 'bin', 'dsetool')
        args = [dsetool, '-h', node_ip, '-j', str(self.jmx_port), '-c', str(binary_port)]
        args += cmd.split()
        p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return handle_external_tool_process(p, args)

    def dse(self, dse_options=None):
        if dse_options is None:
            dse_options = []
        env = self.get_env()
        extension.append_to_client_env(self, env)
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse]
        args += dse_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def hadoop(self, hadoop_options=None):
        if hadoop_options is None:
            hadoop_options = []
        env = self.get_env()
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse, 'hadoop']
        args += hadoop_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def hive(self, hive_options=None):
        if hive_options is None:
            hive_options = []
        env = self.get_env()
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse, 'hive']
        args += hive_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def pig(self, pig_options=None):
        if pig_options is None:
            pig_options = []
        env = self.get_env()
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse, 'pig']
        args += pig_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def sqoop(self, sqoop_options=None):
        if sqoop_options is None:
            sqoop_options = []
        env = self.get_env()
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse, 'sqoop']
        args += sqoop_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def spark(self, spark_options=None):
        if spark_options is None:
            spark_options = []
        env = self.get_env()
        env['JMX_PORT'] = self.jmx_port
        dse = common.join_bin(self.get_install_dir(), 'bin', 'dse')
        args = [dse, 'spark']
        args += spark_options
        p = subprocess.Popen(args, env=env)  #Don't redirect stdout/stderr, users need to interact with new process
        return handle_external_tool_process(p, args)

    def import_dse_config_files(self):
        self._update_config()
        if not os.path.isdir(os.path.join(self.get_path(), 'resources', 'dse', 'conf')):
            os.makedirs(os.path.join(self.get_path(), 'resources', 'dse', 'conf'))
        common.copy_directory(os.path.join(self.get_install_dir(), 'resources', 'dse', 'conf'), os.path.join(self.get_path(), 'resources', 'dse', 'conf'))
        self._update_yaml()

    def copy_config_files(self):
        for product in ['dse', 'cassandra', 'hadoop', 'hadoop2-client', 'sqoop', 'hive', 'tomcat', 'spark', 'shark', 'mahout', 'pig', 'solr', 'graph']:
            src_conf = os.path.join(self.get_install_dir(), 'resources', product, 'conf')
            dst_conf = os.path.join(self.get_path(), 'resources', product, 'conf')
            if not os.path.isdir(src_conf):
                continue
            if os.path.isdir(dst_conf):
                common.rmdirs(dst_conf)
            shutil.copytree(src_conf, dst_conf)
            if product == 'solr':
                src_web = os.path.join(self.get_install_dir(), 'resources', product, 'web')
                dst_web = os.path.join(self.get_path(), 'resources', product, 'web')
                if os.path.isdir(dst_web):
                    common.rmdirs(dst_web)
                shutil.copytree(src_web, dst_web)
            if product == 'tomcat':
                src_lib = os.path.join(self.get_install_dir(), 'resources', product, 'lib')
                dst_lib = os.path.join(self.get_path(), 'resources', product, 'lib')
                if os.path.isdir(dst_lib):
                    common.rmdirs(dst_lib)
                if os.path.exists(src_lib):
                    shutil.copytree(src_lib, dst_lib)
                src_webapps = os.path.join(self.get_install_dir(), 'resources', product, 'webapps')
                dst_webapps = os.path.join(self.get_path(), 'resources', product, 'webapps')
                if os.path.isdir(dst_webapps):
                    common.rmdirs(dst_webapps)
                shutil.copytree(src_webapps, dst_webapps)
        src_lib = os.path.join(self.get_install_dir(), 'resources', product, 'gremlin-console', 'conf')
        dst_lib = os.path.join(self.get_path(), 'resources', product, 'gremlin-console', 'conf')
        if os.path.isdir(dst_lib):
            common.rmdirs(dst_lib)
        if os.path.exists(src_lib):
            shutil.copytree(src_lib, dst_lib)

    def import_bin_files(self):
        common.copy_directory(os.path.join(self.get_install_dir(), 'bin'), self.get_bin_dir())
        cassandra_bin_dir = os.path.join(self.get_path(), 'resources', 'cassandra', 'bin')
        shutil.rmtree(cassandra_bin_dir, ignore_errors=True)
        os.makedirs(cassandra_bin_dir)
        common.copy_directory(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'bin'), cassandra_bin_dir)
        if os.path.exists(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'tools')):
            cassandra_tools_dir = os.path.join(self.get_path(), 'resources', 'cassandra', 'tools')
            shutil.rmtree(cassandra_tools_dir, ignore_errors=True)
            shutil.copytree(os.path.join(self.get_install_dir(), 'resources', 'cassandra', 'tools'), cassandra_tools_dir)
        self.export_dse_home_in_dse_env_sh()

    def export_dse_home_in_dse_env_sh(self):
        '''
        Due to the way CCM lays out files, separating the repository
        from the node(s) confs, the `dse-env.sh` script of each node
        needs to have its DSE_HOME var set and exported. Since DSE
        4.5.x, the stock `dse-env.sh` file includes a commented-out
        place to do exactly this, intended for installers.
        Basically: read in the file, write it back out and add the two
        lines.
        'sstableloader' is an example of a node script that depends on
        this, when used in a CCM-built cluster.
        '''
        with open(self.get_bin_dir() + "/dse-env.sh", "r") as dse_env_sh:
            buf = dse_env_sh.readlines()

        with open(self.get_bin_dir() + "/dse-env.sh", "w") as out_file:
            for line in buf:
                out_file.write(line)
                if line == "# This is here so the installer can force set DSE_HOME\n":
                    out_file.write("DSE_HOME=" + self.get_install_dir() + "\nexport DSE_HOME\n")

    def _update_log4j(self):
        super(DseNode, self)._update_log4j()

        conf_file = os.path.join(self.get_conf_dir(), common.LOG4J_CONF)
        append_pattern = 'log4j.appender.V.File='
        log_file = os.path.join(self.get_path(), 'logs', 'solrvalidation.log')
        if common.is_win():
            log_file = re.sub("\\\\", "/", log_file)
        common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)

        append_pattern = 'log4j.appender.A.File='
        log_file = os.path.join(self.get_path(), 'logs', 'audit.log')
        if common.is_win():
            log_file = re.sub("\\\\", "/", log_file)
        common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)

        append_pattern = 'log4j.appender.B.File='
        log_file = os.path.join(self.get_path(), 'logs', 'audit', 'dropped-events.log')
        if common.is_win():
            log_file = re.sub("\\\\", "/", log_file)
        common.replace_in_file(conf_file, append_pattern, append_pattern + log_file)

    def _update_yaml(self):
        super(DseNode, self)._update_yaml()
        conf_file = os.path.join(self.get_path(), 'resources', 'dse', 'conf', 'dse.yaml')
        with open(conf_file, 'r') as f:
            data = yaml.safe_load(f)

        data['system_key_directory'] = os.path.join(self.get_path(), 'keys')

        # Get a map of combined cluster and node configuration with the node
        # configuration taking precedence.
        full_options = common.merge_configuration(
            self.cluster._dse_config_options,
            self._dse_config_options, delete_empty=False)

        # Merge options with original yaml data.
        data = common.merge_configuration(data, full_options)

        with open(conf_file, 'w') as f:
            yaml.safe_dump(data, f, default_flow_style=False)

    def __generate_server_xml(self):
        server_xml = os.path.join(self.get_path(), 'resources', 'tomcat', 'conf', 'server.xml')
        if os.path.isfile(server_xml):
            os.remove(server_xml)
        with open(server_xml, 'w+') as f:
            f.write('<Server port="8005" shutdown="SHUTDOWN">\n')
            f.write('  <Service name="Solr">\n')
            f.write('    <Connector port="8983" address="%s" protocol="HTTP/1.1" connectionTimeout="20000" maxThreads = "200" URIEncoding="UTF-8"/>\n' % self.ip_addr)
            f.write('    <Engine name="Solr" defaultHost="localhost">\n')
            f.write('      <Host name="localhost"  appBase="../solr/web"\n')
            f.write('            unpackWARs="true" autoDeploy="true"\n')
            f.write('            xmlValidation="false" xmlNamespaceAware="false">\n')
            f.write('      </Host>\n')
            f.write('    </Engine>\n')
            f.write('  </Service>\n')
            f.write('</Server>\n')
            f.close()

    def __update_gremlin_config_yaml(self):
        conf_file = os.path.join(self.get_path(), 'resources', 'graph', 'gremlin-console', 'conf', 'remote.yaml')
        with open(conf_file, 'r') as f:
            data = yaml.safe_load(f)

        data['hosts'] = [self.ip_addr]

        with open(conf_file, 'w') as f:
            yaml.safe_dump(data, f, default_flow_style=False)

    def _get_directories(self):
        dirs = []
        for i in ['data', 'commitlogs', 'saved_caches', 'logs', 'bin', 'keys', 'resources', os.path.join('data', 'hints')]:
            dirs.append(os.path.join(self.get_path(), i))
        return dirs

    def _copy_agent(self):
        agent_source = os.path.join(self.get_install_dir(), 'datastax-agent')
        agent_target = os.path.join(self.get_path(), 'datastax-agent')
        if os.path.exists(agent_source) and not os.path.exists(agent_target):
            shutil.copytree(agent_source, agent_target)

    def _write_agent_address_yaml(self, agent_dir):
        address_yaml = os.path.join(agent_dir, 'conf', 'address.yaml')
        if not os.path.exists(address_yaml):
            with open(address_yaml, 'w+') as f:
                ip = self.ip_addr
                jmx = self.jmx_port
                f.write('stomp_interface: 127.0.0.1\n')
                f.write('local_interface: %s\n' % ip)
                f.write('agent_rpc_interface: %s\n' % ip)
                f.write('agent_rpc_broadcast_address: %s\n' % ip)
                f.write('cassandra_conf: %s\n' % os.path.join(self.get_path(), 'resources', 'cassandra', 'conf', 'cassandra.yaml'))
                f.write('cassandra_install: %s\n' % self.get_path())
                f.write('cassandra_logs: %s\n' % os.path.join(self.get_path(), 'logs'))
                if 'thrift' in self.network_interfaces: 
                    (_, port) = self.network_interfaces['thrift']
                    f.write('thrift_port: %s\n' % port)

                f.write('jmx_port: %s\n' % jmx)
                f.close()

    def _write_agent_log4j_properties(self, agent_dir):
        log4j_properties = os.path.join(agent_dir, 'conf', 'log4j.properties')
        with open(log4j_properties, 'w+') as f:
            f.write('log4j.rootLogger=INFO,R\n')
            f.write('log4j.logger.org.apache.http=OFF\n')
            f.write('log4j.logger.org.eclipse.jetty.util.log=WARN,R\n')
            f.write('log4j.appender.R=org.apache.log4j.RollingFileAppender\n')
            f.write('log4j.appender.R.maxFileSize=20MB\n')
            f.write('log4j.appender.R.maxBackupIndex=5\n')
            f.write('log4j.appender.R.layout=org.apache.log4j.PatternLayout\n')
            f.write('log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %m%n\n')
            f.write('log4j.appender.R.File=./log/agent.log\n')
            f.close()

    def _update_spark_env(self):
        try:
            node_num = re.search(u'node(\d+)', self.name).group(1)
        except AttributeError:
            node_num = 0
        conf_file = os.path.join(self.get_path(), 'resources', 'spark', 'conf', 'spark-env.sh')
        env = self.get_env()
        content = []
        with open(conf_file, 'r') as f:
            for line in f.readlines():
                for spark_var in env.keys():
                    if line.startswith('export %s=' % spark_var) or line.startswith('export %s=' % spark_var, 2):
                        line = 'export %s=%s\n' % (spark_var, env[spark_var])
                        break
                content.append(line)

        with open(conf_file, 'w') as f:
            f.writelines(content)

        # set unique spark.shuffle.service.port for each node; this is only needed for DSE 5.0.x;
        # starting in 5.1 this setting is no longer needed
        if self.cluster.version() > '5.0' and self.cluster.version() < '5.1':
            defaults_file = os.path.join(self.get_path(), 'resources', 'spark', 'conf', 'spark-defaults.conf')
            with open(defaults_file, 'a') as f:
                port_num = 7737 + int(node_num)
                f.write("\nspark.shuffle.service.port %s\n" % port_num)

        # create Spark working dirs; starting with DSE 5.0.10/5.1.3 these are no longer automatically created
        for e in ["SPARK_WORKER_DIR", "SPARK_LOCAL_DIRS", "SPARK_EXECUTOR_DIRS"]:
            dir = env[e]
            if not os.path.exists(dir):
                os.makedirs(dir)

    def make_dse_env(self, install_dir, node_path, node_ip):
        version = DseNode.get_version_from_build(node_path=node_path)
        env = os.environ.copy()
        env['MAX_HEAP_SIZE'] = os.environ.get('CCM_MAX_HEAP_SIZE', '500M')
        env['HEAP_NEWSIZE'] = os.environ.get('CCM_HEAP_NEWSIZE', '50M')
        if version < '6.0':
            env['SPARK_WORKER_MEMORY'] = os.environ.get('SPARK_WORKER_MEMORY', '1024M')
            env['SPARK_WORKER_CORES'] = os.environ.get('SPARK_WORKER_CORES', '2')
        else:
            env['ALWAYSON_SQL_LOG_DIR'] = os.path.join(node_path, 'logs')
        env['DSE_HOME'] = os.path.join(install_dir)
        env['DSE_CONF'] = os.path.join(node_path, 'resources', 'dse', 'conf')
        env['CASSANDRA_HOME'] = os.path.join(install_dir, 'resources', 'cassandra')
        env['CASSANDRA_CONF'] = os.path.join(node_path, 'resources', 'cassandra', 'conf')
        env['HIVE_CONF_DIR'] = os.path.join(node_path, 'resources', 'hive', 'conf')
        env['SQOOP_CONF_DIR'] = os.path.join(node_path, 'resources', 'sqoop', 'conf')
        env['TOMCAT_HOME'] = os.path.join(node_path, 'resources', 'tomcat')
        env['TOMCAT_CONF_DIR'] = os.path.join(node_path, 'resources', 'tomcat', 'conf')
        env['PIG_CONF_DIR'] = os.path.join(node_path, 'resources', 'pig', 'conf')
        env['MAHOUT_CONF_DIR'] = os.path.join(node_path, 'resources', 'mahout', 'conf')
        env['SPARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'spark', 'conf')
        env['SHARK_CONF_DIR'] = os.path.join(node_path, 'resources', 'shark', 'conf')
        env['GREMLIN_CONSOLE_CONF_DIR'] = os.path.join(node_path, 'resources', 'graph', 'gremlin-console', 'conf')
        env['SPARK_WORKER_DIR'] = os.path.join(node_path, 'spark', 'worker')
        env['SPARK_LOCAL_DIRS'] = os.path.join(node_path, 'spark', '.local')
        env['SPARK_EXECUTOR_DIRS'] = os.path.join(node_path, 'spark', 'rdd')
        env['SPARK_WORKER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'worker')
        env['SPARK_MASTER_LOG_DIR'] = os.path.join(node_path, 'logs', 'spark', 'master')
        env['DSE_LOG_ROOT'] = os.path.join(node_path, 'logs', 'dse')
        env['CASSANDRA_LOG_DIR'] = os.path.join(node_path, 'logs')
        env['SPARK_LOCAL_IP'] = '' + node_ip
        if version >= '5.0':
            env['HADOOP1_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop', 'conf')
            env['HADOOP2_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop2-client', 'conf')
        else:
            env['HADOOP_CONF_DIR'] = os.path.join(node_path, 'resources', 'hadoop', 'conf')
        return env


def get_dse_version(install_dir):
    """ look for a dse*.jar and extract the version number """
    for root, dirs, files in os.walk(install_dir):
        for file in files:
            match = re.search('^dse(?:-core)?-([0-9.]+)(?:-.*)?\.jar', file)
            if match:
                return match.group(1)
    return None
