ccmlib/cmds/node_cmds.py (604 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import os import signal import subprocess import sys from six import print_ from ccmlib import common from ccmlib.cmds.command import Cmd from ccmlib.node import NodeError NODE_CMDS = [ "show", "remove", "showlog", "setlog", "start", "stop", "ring", "flush", "compact", "drain", "cleanup", "repair", "scrub", "verify", "shuffle", "sstablesplit", "getsstables", "decommission", "json", "updateconf", "updatelog4j", "stress", "cqlsh", "scrub", "verify", "status", "setdir", "bulkload", "version", "nodetool", "dsetool", "setworkload", "dse", "hadoop", "hive", "pig", "sqoop", "spark", "pause", "resume", "jconsole", "versionfrombuild", "byteman" ] def commands(): return NODE_CMDS class NodeShowCmd(Cmd): descr_text = "Display information on a node" usage = "usage: ccm node_name show [options]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): self.node.show() class NodeRemoveCmd(Cmd): descr_text = "Remove a node (stopping it if necessary and deleting all its data)" usage = "usage: ccm node_name remove [options]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): self.cluster.remove(self.node) class NodeShowlogCmd(Cmd): descr_text = "Show the log of node name (runs your $PAGER on its system.log)" usage = "usage: ccm node_name showlog [options]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): log = self.node.logfilename() pager = os.environ.get('PAGER', common.platform_pager()) os.execvp(pager, (pager, log)) class NodeSetlogCmd(Cmd): options_list = [ (['-c', '--class'], {'type': "string", 'dest': "class_name", 'default': None, 'help': "Optional java class/package. Logging will be set for only this class/package if set"}), ] descr_text = "Set node name log level (INFO, DEBUG, ...) with/without Java class - require a node restart" usage = "usage: ccm node_name setlog [options] level" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) if len(args) == 1: print_('Missing log level', file=sys.stderr) parser.print_help() self.level = args[1] try: self.class_name = options.class_name except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) def run(self): try: self.node.set_log_level(self.level, self.class_name) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) class NodeClearCmd(Cmd): options_list = [ (['-a', '--all'], {'action': "store_true", 'dest': "all", 'help': "Also clear the saved cache and node log files", 'default': False}), ] descr_text = "Clear the node data & logs (and stop the node)" usage = "usage: ccm node_name_clear [options]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): self.node.stop() self.node.clear(self.options.all) class NodeStartCmd(Cmd): options_list = [ (['-v', '--verbose'], {'action': "store_true", 'dest': "verbose", 'help': "Print standard output of cassandra process", 'default': False}), (['--no-wait'], {'action': "store_true", 'dest': "no_wait", 'help': "Do not wait for cassandra node to be ready", 'default': False}), (['--wait-other-notice'], {'action': "store_true", 'dest': "deprecate", 'help': "DEPRECATED/IGNORED: Use '--skip-wait-other-notice' instead. This is now on by default.", 'default': False}), (['--skip-wait-other-notice'], {'action': "store_false", 'dest': "wait_other_notice", 'help': "Skip waiting until all live nodes of the cluster have marked the other nodes UP", 'default': True}), (['--wait-for-binary-proto'], {'action': "store_true", 'dest': "wait_for_binary_proto", 'help': "Wait for the binary protocol to start", 'default': False}), (['-j', '--dont-join-ring'], {'action': "store_true", 'dest': "no_join_ring", 'help': "Launch the instance without joining the ring", 'default': False}), (['--replace-address'], {'type': "string", 'dest': "replace_address", 'default': None, 'help': "Replace a node in the ring through the cassandra.replace_address option"}), (['--jvm_arg'], {'action': "append", 'dest': "jvm_args", 'help': "Specify a JVM argument", 'default': []}), (['--quiet-windows'], {'action': "store_true", 'dest': "quiet_start", 'help': "Pass -q on Windows 2.2.4+ and 3.0+ startup. Ignored on linux.", 'default': False}), (['--root'], {'action': "store_true", 'dest': "allow_root", 'help': "Allow CCM to start cassandra as root", 'default': False}), (['--jvm-version'], {'type': "int", 'dest': "jvm_version", 'help': "Specify the JVM version to use (e.g. 8 for Java 8)", 'default': None}), ] descr_text = "Start a node" usage = "usage: ccm node start [options] name" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): try: self.node.start(not self.options.no_join_ring, no_wait=self.options.no_wait, wait_other_notice=self.options.wait_other_notice, wait_for_binary_proto=self.options.wait_for_binary_proto, verbose=self.options.verbose, replace_address=self.options.replace_address, jvm_args=self.options.jvm_args, quiet_start=self.options.quiet_start, allow_root=self.options.allow_root, jvm_version=self.options.jvm_version) except NodeError as e: print_(str(e), file=sys.stderr) print_("Standard error output is:", file=sys.stderr) e.process.stderr_file.seek(0) for line in e.process.stderr_file.readlines(): print_(line.rstrip('\n'), file=sys.stderr) exit(1) class NodeStopCmd(Cmd): options_list = [ (['--no-wait'], {'action': "store_true", 'dest': "no_wait", 'help': "Do not wait for the node to be stopped", 'default': False}), (['-g', '--gently'], {'action': "store_const", 'dest': "signal_event", 'help': "Shut down gently (default)", 'const': signal.SIGTERM, 'default': signal.SIGTERM}), (['--hang-up'], {'action': "store_const", 'dest': "signal_event", 'help': "Shut down via hang up (kill -1)", 'const': common.get_default_signals()['1']}), (['--not-gently'], {'action': "store_const", 'dest': "signal_event", 'help': "Shut down immediately (kill -9)", 'const': common.get_default_signals()['9']}), ] descr_text = "Stop a node" usage = "usage: ccm node stop [options] name" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): try: if not self.node.stop(wait=not self.options.no_wait, signal_event=self.options.signal_event): print_("%s is not running" % self.name, file=sys.stderr) exit(1) except NodeError as e: print_(str(e), file=sys.stderr) exit(1) class _NodeToolCmd(Cmd): usage = "This is a private class, how did you get here?" descr_text = "This is a private class, how did you get here?" nodetool_cmd = '' def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): stdout, stderr, rc = self.node.nodetool(self.nodetool_cmd + " " + " ".join((self.args[1:]))) print_(stderr) print_(stdout) class NodeNodetoolCmd(_NodeToolCmd): usage = "usage: ccm node_name nodetool [options]" descr_text = "Run nodetool (connecting to node name)" def run(self): stdout, stderr, rc = self.node.nodetool(" ".join(self.args[1:])) print_(stderr) print_(stdout) class NodeRingCmd(_NodeToolCmd): usage = "usage: ccm node_name ring [options]" nodetool_cmd = 'ring' descr_text = "Print ring (connecting to node name)" class NodeStatusCmd(_NodeToolCmd): usage = "usage: ccm node_name status [options]" nodetool_cmd = 'status' descr_text = "Print status (connecting to node name)" class NodeFlushCmd(_NodeToolCmd): usage = "usage: ccm node_name flush [options]" nodetool_cmd = 'flush' descr_text = "Flush node name" class NodeCompactCmd(_NodeToolCmd): usage = "usage: ccm node_name compact [options]" nodetool_cmd = 'compact' descr_text = "Compact node name" class NodeDrainCmd(_NodeToolCmd): usage = "usage: ccm node_name drain [options]" nodetool_cmd = 'drain' descr_text = "Drain node name" class NodeCleanupCmd(_NodeToolCmd): usage = "usage: ccm node_name cleanup [options]" nodetool_cmd = 'cleanup' descr_text = "Run cleanup on node name" class NodeRepairCmd(_NodeToolCmd): usage = "usage: ccm node_name repair [options]" nodetool_cmd = 'repair' descr_text = "Run repair on node name" class NodeVersionCmd(_NodeToolCmd): usage = "usage: ccm node_name version" nodetool_cmd = 'version' descr_text = "Get the cassandra version of node" class NodeDecommissionCmd(_NodeToolCmd): usage = "usage: ccm node_name decommission [options]" options_list = [ (['--force'], {'action': "store_true", 'dest': "force", 'help': "Force decommission of this node even when it reduces the number of replicas to below configured RF. Note: This is only relevant for C* 3.12+.", 'default': False}), ] nodetool_cmd = 'decommission' descr_text = "Run decommission on node name" def run(self): self.node.decommission(force=self.options.force) class _DseToolCmd(Cmd): usage = "This is a private class, how did you get here?" descr_text = "This is a private class, how did you get here?" dsetool_cmd = '' def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): stdout, stderr, rc = self.node.dsetool(self.dsetool_cmd + " " + " ".join((self.args[1:]))) print_(stderr) print_(stdout) class NodeDsetoolCmd(_DseToolCmd): usage = "usage: ccm node_name dsetool [options]" descr_text = "Run dsetool (connecting to node name)" def run(self): stdout, stderr, rc = self.node.dsetool(" ".join(self.args[1:])) print_(stderr) print_(stdout) class NodeCqlshCmd(Cmd): options_list = [ (['-x', '--exec'], {'type': "string", 'dest': "cmds", 'default': None, 'help': "Execute the specified commands and exit"}), (['-v', '--verbose'], {'action': "store_true", 'dest': "verbose", 'help': "With --exec, show cli output after completion", 'default': False}), ] descr_text = "Launch a cqlsh session connected to this node" usage = "usage: ccm node_name cqlsh [options] [cli_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.cqlsh_options = parser.get_ignored() + args[1:] def run(self): self.node.run_cqlsh(self.options.cmds, self.cqlsh_options) class NodeBulkloadCmd(Cmd): descr_text = "Bulkload files into the cluster by connecting to this node" usage = "usage: ccm node_name bulkload [options] [sstable_dir]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.loader_options = parser.get_ignored() + args[1:] def run(self): self.node.bulkload(self.loader_options) class NodeScrubCmd(Cmd): descr_text = "Scrub files" usage = "usage: ccm node_name scrub [options] <keyspace> <cf>" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.scrub_options = parser.get_ignored() + args[1:] def run(self): self.node.scrub(self.scrub_options) class NodeVerifyCmd(Cmd): descr_text = "Verify files" usage = "usage: ccm node_name verify [options] <keyspace> <cf>" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.verify_options = parser.get_ignored() + args[1:] def run(self): self.node.verify(self.verify_options) class NodeJsonCmd(Cmd): options_list = [ (['-k', '--keyspace'], {'type': "string", 'dest': "keyspace", 'default': None, 'help': "The keyspace to use [use all keyspaces by default]"}), (['-c', '--column-families'], {'type': "string", 'dest': "cfs", 'default': None, 'help': "Comma separated list of column families to use (requires -k to be set)"}), (['--key'], {'type': "string", 'action': "append", 'dest': "keys", 'default': None, 'help': "The key to include (you may specify multiple --key)"}), (['-e', '--enumerate-keys'], {'action': "store_true", 'dest': "enumerate_keys", 'help': "Only enumerate keys (i.e, call sstable2keys)", 'default': False}), ] descr_text = "Call sstable2json/sstabledump on the sstables of this node" usage = "usage: ccm node_name json [options] [file]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.keyspace = options.keyspace if self.keyspace is None: print_("You must specify a keyspace.") parser.print_help() exit(1) self.outfile = args[-1] if len(args) >= 2 else None self.column_families = options.cfs.split(',') if options.cfs else None def run(self): try: f = sys.stdout if self.outfile is not None: f = open(self.outfile, 'w') if self.node.has_cmd('sstable2json'): self.node.run_sstable2json(keyspace=self.keyspace, out_file=f, column_families=self.column_families, keys=self.options.keys, enumerate_keys=self.options.enumerate_keys) elif self.node.has_cmd('sstabledump'): self.node.run_sstabledump(keyspace=self.keyspace, column_families=self.column_families, keys=self.options.keys, enumerate_keys=self.options.enumerate_keys, command=True) except common.ArgumentError as e: print_(e, file=sys.stderr) class NodeSstablesplitCmd(Cmd): options_list = [ (['-k', '--keyspace'], {'type': "string", 'dest': "keyspace", 'default': None, 'help': "The keyspace to use [use all keyspaces by default]"}), (['-c', '--column-families'], {'type': "string", 'dest': 'cfs', 'default': None, 'help': "Comma separated list of column families to use (requires -k to be set)"}), (['-s', '--size'], {'type': 'int', 'dest': "size", 'default': None, 'help': "Maximum size in MB for the output sstables (default: 50 MB)"}), (['--no-snapshot'], {'action': 'store_true', 'dest': "no_snapshot", 'default': False, 'help': "Don't snapshot the sstables before splitting"}), ] descr_text = "Run sstablesplit on the sstables of this node" usage = "usage: ccm node_name sstablesplit [options] [file]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.keyspace = options.keyspace self.size = options.size self.no_snapshot = options.no_snapshot self.column_families = None self.datafiles = None if options.cfs is not None: if self.keyspace is None: print_("You need a keyspace (option -k) if you specify column families", file=sys.stderr) exit(1) self.column_families = options.cfs.split(',') if len(args) > 1: if self.column_families is None: print_("You need a column family (option -c) if you specify datafiles", file=sys.stderr) exit(1) self.datafiles = args[1:] def run(self): self.node.run_sstablesplit(datafiles=self.datafiles, keyspace=self.keyspace, column_families=self.column_families, size=self.size, no_snapshot=self.no_snapshot) class NodeGetsstablesCmd(Cmd): options_list = [ (['-k', '--keyspace'], {'type': "string", 'dest': "keyspace", 'default': None, 'help': "The keyspace to use [use all keyspaces by default]"}), (['-t', '--tables'], {'type': "string", 'dest': 'tables', 'default': None, 'help': "Comma separated list of tables to use (requires -k to be set)"}), ] descr_text = "Run getsstables to get absolute path of sstables in this node" usage = "usage: ccm node_name getsstables [options] [file]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.keyspace = options.keyspace self.tables = None self.datafiles = None if options.tables is not None: if self.keyspace is None: print_("You need a keyspace (option -k) if you specify tables", file=sys.stderr) exit(1) self.tables = options.tables.split(',') if len(args) > 1: if self.tables is None: print_("You need a tables (option -t) if you specify datafiles", file=sys.stderr) exit(1) self.datafiles = args[1:] def run(self): sstablefiles = self.node.get_sstablespath(datafiles=self.datafiles, keyspace=self.keyspace, tables=self.tables) print_('\n'.join(sstablefiles)) class NodeUpdateconfCmd(Cmd): options_list = [ (['--no-hh', '--no-hinted-handoff'], {'action': "store_false", 'dest': "hinted_handoff", 'default': True, 'help': "Disable hinted handoff"}), (['--batch-cl', '--batch-commit-log'], {'action': "store_true", 'dest': "cl_batch", 'default': None, 'help': "Set commit log to batch mode"}), (['--periodic-cl', '--periodic-commit-log'], {'action': "store_true", 'dest': "cl_periodic", 'default': None, 'help': "Set commit log to periodic mode"}), (['--rt', '--rpc-timeout'], {'action': "store", 'type': 'int', 'dest': "rpc_timeout", 'help': "Set rpc timeout"}), (['-y', '--yaml'], {'action': "store_true", 'dest': "literal_yaml", 'default': False, 'help': "Pass in literal yaml string. Option syntax looks like ccm node_name updateconf -y 'a: [b: [c,d]]'"}), ] descr_text = "Update the cassandra config files for this node (useful when updating cassandra)" usage = "usage: ccm node_name updateconf [options] [ new_setting | ... ], where new_setting should be a string of the form 'compaction_throughput_mb_per_sec: 32'" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) args = args[1:] try: self.setting = common.parse_settings(args, literal_yaml=self.options.literal_yaml) if self.options.cl_batch and self.options.cl_periodic: print_("Can't set commitlog to be both batch and periodic.{}".format(os.linesep)) parser.print_help() exit(1) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) def run(self): self.setting['hinted_handoff_enabled'] = self.options.hinted_handoff if self.options.rpc_timeout is not None: if self.node.cluster.cassandra_version() < "1.2": self.setting['rpc_timeout_in_ms'] = self.options.rpc_timeout else: self.setting['read_request_timeout_in_ms'] = self.options.rpc_timeout self.setting['range_request_timeout_in_ms'] = self.options.rpc_timeout self.setting['write_request_timeout_in_ms'] = self.options.rpc_timeout self.setting['truncate_request_timeout_in_ms'] = self.options.rpc_timeout self.setting['request_timeout_in_ms'] = self.options.rpc_timeout self.node.set_configuration_options(values=self.setting) if self.options.cl_batch: self.node.set_batch_commitlog(True) if self.options.cl_periodic: self.node.set_batch_commitlog(False) class NodeUpdatedseconfCmd(Cmd): options_list = [ (['-y', '--yaml'], {'action': "store_true", 'dest': "literal_yaml", 'default': False, 'help': "Pass in literal yaml string. Option syntax looks like ccm node_name updatedseconf -y 'a: [b: [c,d]]'"}), ] descr_text = "Update the dse config files for this node" usage = "usage: ccm node_name updatedseconf [options] [ new_setting | ... ], where new setting should be a string of the form 'max_solr_concurrency_per_core: 2'; nested options can be separated with a period like 'cql_slow_log_options.enabled: true'" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) args = args[1:] try: self.setting = common.parse_settings(args, literal_yaml=self.options.literal_yaml) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) def run(self): self.node.set_dse_configuration_options(values=self.setting) # # Class implementens the functionality of updating log4j-server.properties # on the given node by copying the given config into # ~/.ccm/name-of-cluster/nodeX/conf/log4j-server.properties # class NodeUpdatelog4jCmd(Cmd): options_list = [ (['-p', '--path'], {'type': "string", 'dest': "log4jpath", 'help': "Path to new Cassandra log4j configuration file"}), ] descr_text = "Update the Cassandra log4j-server.properties configuration file under given node" usage = "usage: ccm node_name updatelog4j -p <log4j config>" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) try: self.log4jpath = options.log4jpath if self.log4jpath is None: raise KeyError("[Errno] -p or --path <path of new log4j configuration file> is not provided") except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) except KeyError as e: print_(str(e), file=sys.stderr) exit(1) def run(self): try: self.node.update_log4j(self.log4jpath) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) class NodeStressCmd(Cmd): descr_text = "Run stress on a node" usage = "usage: ccm node_name stress [options] [stress_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.stress_options = args[1:] + parser.get_ignored() def run(self): try: self.node.stress(self.stress_options) except OSError: print_("Could not find stress binary (you may need to build it)", file=sys.stderr) class NodeShuffleCmd(Cmd): descr_text = "Run shuffle on a node" usage = "usage: ccm node_name shuffle [options] [shuffle_cmds]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.shuffle_cmd = args[1] def run(self): self.node.shuffle(self.shuffle_cmd) class NodeSetdirCmd(Cmd): options_list = [ (['-v', "--version"], {'type': "string", 'dest': "version", 'help': "Download and use provided cassandra or dse version. If version is of the form 'git:<branch name>', then the specified branch will be downloaded from the git repo and compiled. (takes precedence over --install-dir)", 'default': None}), (["--install-dir"], {'type': "string", 'dest': "install_dir", 'help': "Path to the cassandra or dse directory to use [default %default]", 'default': "./"}), ] descr_text = "Set the cassandra directory to use for the node" usage = "usage: ccm node_name setdir [options]" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): try: self.node.set_install_dir(install_dir=self.options.install_dir, version=self.options.version, verbose=True) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) class NodeSetworkloadCmd(Cmd): descr_text = "Sets the workloads for a DSE node" usage = "usage: ccm node_name setworkload [cassandra|solr|hadoop|spark|dsefs|cfs|graph],..." def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.workloads = args[1].split(',') valid_workloads = ['cassandra', 'solr', 'hadoop', 'spark', 'dsefs', 'cfs', 'graph'] for workload in self.workloads: if workload not in valid_workloads: print_(workload, ' is not a valid workload') exit(1) def run(self): try: self.node.set_workloads(workloads=self.workloads) except common.ArgumentError as e: print_(str(e), file=sys.stderr) exit(1) class NodeDseCmd(Cmd): descr_text = "Launch a dse client application connected to this node" usage = "usage: ccm node_name dse [dse_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.dse_options = args[1:] + parser.get_ignored() def run(self): self.node.dse(self.dse_options) class NodeHadoopCmd(Cmd): descr_text = "Launch a hadoop session connected to this node" usage = "usage: ccm node_name hadoop [options] [hadoop_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.hadoop_options = args[1:] + parser.get_ignored() def run(self): self.node.hadoop(self.hadoop_options) class NodeHiveCmd(Cmd): descr_text = "Launch a hive session connected to this node" usage = "usage: ccm node_name hive [options] [hive_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.hive_options = args[1:] + parser.get_ignored() def run(self): self.node.hive(self.hive_options) class NodePigCmd(Cmd): descr_text = "Launch a pig session connected to this node" usage = "usage: ccm node_name pig [options] [pig_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.pig_options = parser.get_ignored() + args[1:] def run(self): self.node.pig(self.pig_options) class NodeSqoopCmd(Cmd): descr_text = "Launch a sqoop session connected to this node" usage = "usage: ccm node_name sqoop [options] [sqoop_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.sqoop_options = args[1:] + parser.get_ignored() def run(self): self.node.sqoop(self.sqoop_options) class NodeSparkCmd(Cmd): descr_text = "Launch a spark session connected to this node" usage = "usage: ccm node_name spark [options] [spark_options]" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.spark_options = args[1:] + parser.get_ignored() def run(self): self.node.spark(self.spark_options) class NodePauseCmd(Cmd): descr_text = "Send a SIGSTOP to this node" usage = "usage: ccm node_name pause" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): self.node.pause() class NodeResumeCmd(Cmd): descr_text = "Send a SIGCONT to this node" usage = "usage: ccm node_name resume" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): self.node.resume() class NodeJconsoleCmd(Cmd): descr_text = "Opens jconsole client and connect to running node" usage = "usage: ccm node_name jconsole" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): cmds = ["jconsole", "localhost:%s" % self.node.jmx_port] try: subprocess.call(cmds) except OSError: print_("Could not start jconsole. Please make sure jconsole can be found in your $PATH.") exit(1) class NodeVersionfrombuildCmd(Cmd): descr_text = "Print the node's version as grepped from build.xml. Can be used when the node isn't running." usage = "usage: ccm node_name versionfrombuild" def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) def run(self): print_(common.get_version_from_build(self.node.get_install_dir())) class NodeBytemanCmd(Cmd): descr_text = "Invoke byteman-submit " usage = "usage: ccm node_name byteman-submit" ignore_unknown_options = True def validate(self, parser, options, args): Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True) self.byteman_options = args[1:] + parser.get_ignored() def run(self): self.node.byteman_submit(self.byteman_options)