in src/shell/commands/node_management.cpp [92:487]
bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
{"resolve_ip", no_argument, 0, 'r'},
{"resource_usage", no_argument, 0, 'u'},
{"qps", no_argument, 0, 'q'},
{"json", no_argument, 0, 'j'},
{"status", required_argument, 0, 's'},
{"output", required_argument, 0, 'o'},
{0, 0, 0, 0}};
std::string status;
std::string output_file;
bool detailed = false;
bool resolve_ip = false;
bool resource_usage = false;
bool show_qps = false;
bool show_latency = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "druqjs:o:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'd':
detailed = true;
break;
case 'r':
resolve_ip = true;
break;
case 'u':
resource_usage = true;
break;
case 'q':
show_qps = true;
show_latency = true;
break;
case 'j':
json = true;
break;
case 's':
status = optarg;
break;
case 'o':
output_file = optarg;
break;
default:
return false;
}
}
dsn::utils::multi_table_printer mtp;
if (!(status.empty() && output_file.empty())) {
dsn::utils::table_printer tp("parameters");
if (!status.empty())
tp.add_row_name_and_data("status", status);
if (!output_file.empty())
tp.add_row_name_and_data("out_file", output_file);
mtp.add(std::move(tp));
}
::dsn::replication::node_status::type s = ::dsn::replication::node_status::NS_INVALID;
if (!status.empty() && status != "all") {
s = type_from_string(dsn::replication::_node_status_VALUES_TO_NAMES,
std::string("ns_") + status,
::dsn::replication::node_status::NS_INVALID);
verify_logged(s != ::dsn::replication::node_status::NS_INVALID,
"parse %s as node_status::type failed",
status.c_str());
}
std::map<dsn::rpc_address, dsn::replication::node_status::type> nodes;
auto r = sc->ddl_client->list_nodes(s, nodes);
if (r != dsn::ERR_OK) {
std::cout << "list nodes failed, error=" << r.to_string() << std::endl;
return true;
}
std::map<dsn::rpc_address, list_nodes_helper> tmp_map;
int alive_node_count = 0;
for (auto &kv : nodes) {
if (kv.second == dsn::replication::node_status::NS_ALIVE)
alive_node_count++;
std::string status_str = dsn::enum_to_string(kv.second);
status_str = status_str.substr(status_str.find("NS_") + 3);
std::string node_name = kv.first.to_std_string();
if (resolve_ip) {
// TODO: put hostname_from_ip_port into common utils
dsn::utils::hostname_from_ip_port(node_name.c_str(), &node_name);
}
tmp_map.emplace(kv.first, list_nodes_helper(node_name, status_str));
}
if (detailed) {
std::vector<::dsn::app_info> apps;
r = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
if (r != dsn::ERR_OK) {
std::cout << "list apps failed, error=" << r.to_string() << std::endl;
return true;
}
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<dsn::partition_configuration> partitions;
r = sc->ddl_client->list_app(app.app_name, app_id, partition_count, partitions);
if (r != dsn::ERR_OK) {
std::cout << "list app " << app.app_name << " failed, error=" << r.to_string()
<< std::endl;
return true;
}
for (const dsn::partition_configuration &p : partitions) {
if (!p.primary.is_invalid()) {
auto find = tmp_map.find(p.primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const dsn::rpc_address &addr : p.secondaries) {
auto find = tmp_map.find(addr);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
}
}
}
}
if (resource_usage) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
std::cout << "get replica server node list failed" << std::endl;
return true;
}
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-prefix",
{"replica*server*memused.res(MB)",
"replica*app.pegasus*rdb.block_cache.memory_usage",
"replica*eon.replica_stub*disk.available.total.ratio",
"replica*eon.replica_stub*disk.available.min.ratio",
"replica*app.pegasus*rdb.memtable.memory_usage",
"replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"});
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
auto tmp_it = tmp_map.find(node_addr);
if (tmp_it == tmp_map.end())
continue;
if (!results[i].first) {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " failed" << std::endl;
return true;
}
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
std::cout << "decode perf counter info from node " << node_addr.to_string()
<< " failed, result = " << results[i].second << std::endl;
return true;
}
if (info.result != "OK") {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " returns error, error = " << info.result << std::endl;
return true;
}
list_nodes_helper &h = tmp_it->second;
for (dsn::perf_counter_metric &m : info.counters) {
if (m.name.find("memused.res(MB)") != std::string::npos)
h.memused_res_mb += m.value;
else if (m.name.find("rdb.block_cache.memory_usage") != std::string::npos)
h.block_cache_bytes += m.value;
else if (m.name.find("disk.available.total.ratio") != std::string::npos)
h.disk_available_total_ratio += m.value;
else if (m.name.find("disk.available.min.ratio") != std::string::npos)
h.disk_available_min_ratio += m.value;
else if (m.name.find("rdb.memtable.memory_usage") != std::string::npos)
h.mem_tbl_bytes += m.value;
else if (m.name.find("rdb.index_and_filter_blocks.memory_usage") !=
std::string::npos)
h.mem_idx_bytes += m.value;
}
}
}
if (show_qps) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
std::cout << "get replica server node list failed" << std::endl;
return true;
}
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-prefix",
{"replica*app.pegasus*get_qps",
"replica*app.pegasus*multi_get_qps",
"replica*app.pegasus*batch_get_qps",
"replica*app.pegasus*put_qps",
"replica*app.pegasus*multi_put_qps",
"replica*app.pegasus*recent.read.cu",
"replica*app.pegasus*recent.write.cu"});
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
auto tmp_it = tmp_map.find(node_addr);
if (tmp_it == tmp_map.end())
continue;
if (!results[i].first) {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " failed" << std::endl;
return true;
}
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
std::cout << "decode perf counter info from node " << node_addr.to_string()
<< " failed, result = " << results[i].second << std::endl;
return true;
}
if (info.result != "OK") {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " returns error, error = " << info.result << std::endl;
return true;
}
list_nodes_helper &h = tmp_it->second;
for (dsn::perf_counter_metric &m : info.counters) {
if (m.name.find("replica*app.pegasus*get_qps") != std::string::npos)
h.get_qps += m.value;
else if (m.name.find("replica*app.pegasus*multi_get_qps") != std::string::npos)
h.multi_get_qps += m.value;
else if (m.name.find("replica*app.pegasus*batch_get_qps") != std::string::npos)
h.batch_get_qps += m.value;
else if (m.name.find("replica*app.pegasus*put_qps") != std::string::npos)
h.put_qps += m.value;
else if (m.name.find("replica*app.pegasus*multi_put_qps") != std::string::npos)
h.multi_put_qps += m.value;
else if (m.name.find("replica*app.pegasus*recent.read.cu") != std::string::npos)
h.read_cu += m.value;
else if (m.name.find("replica*app.pegasus*recent.write.cu") != std::string::npos)
h.write_cu += m.value;
}
}
}
if (show_latency) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
std::cout << "get replica server node list failed" << std::endl;
return true;
}
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-postfix",
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"});
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
auto tmp_it = tmp_map.find(node_addr);
if (tmp_it == tmp_map.end())
continue;
if (!results[i].first) {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " failed" << std::endl;
return true;
}
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
std::cout << "decode perf counter info from node " << node_addr.to_string()
<< " failed, result = " << results[i].second << std::endl;
return true;
}
if (info.result != "OK") {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " returns error, error = " << info.result << std::endl;
return true;
}
list_nodes_helper &h = tmp_it->second;
for (dsn::perf_counter_metric &m : info.counters) {
if (m.name.find("RPC_RRDB_RRDB_GET.latency.server") != std::string::npos)
h.get_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_PUT.latency.server") != std::string::npos)
h.put_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_GET.latency.server") != std::string::npos)
h.multi_get_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.latency.server") != std::string::npos)
h.multi_put_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_BATCH_GET.latency.server") != std::string::npos)
h.batch_get_p99 = m.value;
}
}
}
// print configuration_list_nodes_response
std::streambuf *buf;
std::ofstream of;
if (!output_file.empty()) {
of.open(output_file);
buf = of.rdbuf();
} else {
buf = std::cout.rdbuf();
}
std::ostream out(buf);
dsn::utils::table_printer tp("details");
tp.add_title("address");
tp.add_column("status");
if (detailed) {
tp.add_column("replica_count", tp_alignment::kRight);
tp.add_column("primary_count", tp_alignment::kRight);
tp.add_column("secondary_count", tp_alignment::kRight);
}
if (resource_usage) {
tp.add_column("memused_res_mb", tp_alignment::kRight);
tp.add_column("block_cache_mb", tp_alignment::kRight);
tp.add_column("mem_tbl_mb", tp_alignment::kRight);
tp.add_column("mem_idx_mb", tp_alignment::kRight);
tp.add_column("disk_avl_total_ratio", tp_alignment::kRight);
tp.add_column("disk_avl_min_ratio", tp_alignment::kRight);
}
if (show_qps) {
tp.add_column("get_qps", tp_alignment::kRight);
tp.add_column("mget_qps", tp_alignment::kRight);
tp.add_column("bget_qps", tp_alignment::kRight);
tp.add_column("read_cu", tp_alignment::kRight);
tp.add_column("put_qps", tp_alignment::kRight);
tp.add_column("mput_qps", tp_alignment::kRight);
tp.add_column("write_cu", tp_alignment::kRight);
}
if (show_latency) {
tp.add_column("get_p99(ms)", tp_alignment::kRight);
tp.add_column("mget_p99(ms)", tp_alignment::kRight);
tp.add_column("bget_p99(ms)", tp_alignment::kRight);
tp.add_column("put_p99(ms)", tp_alignment::kRight);
tp.add_column("mput_p99(ms)", tp_alignment::kRight);
}
for (auto &kv : tmp_map) {
tp.add_row(kv.second.node_name);
tp.append_data(kv.second.node_status);
if (detailed) {
tp.append_data(kv.second.primary_count + kv.second.secondary_count);
tp.append_data(kv.second.primary_count);
tp.append_data(kv.second.secondary_count);
}
if (resource_usage) {
tp.append_data(kv.second.memused_res_mb);
tp.append_data(kv.second.block_cache_bytes / (1 << 20U));
tp.append_data(kv.second.mem_tbl_bytes / (1 << 20U));
tp.append_data(kv.second.mem_idx_bytes / (1 << 20U));
tp.append_data(kv.second.disk_available_total_ratio);
tp.append_data(kv.second.disk_available_min_ratio);
}
if (show_qps) {
tp.append_data(kv.second.get_qps);
tp.append_data(kv.second.multi_get_qps);
tp.append_data(kv.second.batch_get_qps);
tp.append_data(kv.second.read_cu);
tp.append_data(kv.second.put_qps);
tp.append_data(kv.second.multi_put_qps);
tp.append_data(kv.second.write_cu);
}
if (show_latency) {
tp.append_data(kv.second.get_p99 / 1e6);
tp.append_data(kv.second.multi_get_p99 / 1e6);
tp.append_data(kv.second.batch_get_p99 / 1e6);
tp.append_data(kv.second.put_p99 / 1e6);
tp.append_data(kv.second.multi_put_p99 / 1e6);
}
}
mtp.add(std::move(tp));
dsn::utils::table_printer tp_count("summary");
tp_count.add_row_name_and_data("total_node_count", nodes.size());
tp_count.add_row_name_and_data("alive_node_count", alive_node_count);
tp_count.add_row_name_and_data("unalive_node_count", nodes.size() - alive_node_count);
mtp.add(std::move(tp_count));
mtp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);
return true;
}