in src/shell/commands/node_management.cpp [428:738]
bool ls_nodes(command_executor *, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"detailed", no_argument, nullptr, 'd'},
{"resolve_ip", no_argument, nullptr, 'r'},
{"resource_usage", no_argument, nullptr, 'u'},
{"qps", no_argument, nullptr, 'q'},
{"json", no_argument, nullptr, 'j'},
{"status", required_argument, nullptr, 's'},
{"output", required_argument, nullptr, 'o'},
{"sample_interval_ms", required_argument, nullptr, 'i'},
{nullptr, 0, nullptr, 0}};
std::string status;
std::string output_file;
uint32_t sample_interval_ms = FLAGS_nodes_sample_interval_ms;
bool detailed = false;
bool resolve_ip = false;
bool resource_usage = false;
bool show_qps = false;
bool show_latency = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
// TODO(wangdan): getopt_long() is not thread-safe (clang-tidy[concurrency-mt-unsafe]),
// could use https://github.com/p-ranav/argparse instead.
int c = getopt_long(args.argc, args.argv, "druqjs:o:i:", long_options, &option_index);
if (c == -1) {
// -1 means all command-line options have been parsed.
break;
}
switch (c) {
case 'd':
detailed = true;
break;
case 'r':
resolve_ip = true;
break;
case 'u':
resource_usage = true;
break;
case 'q':
show_qps = true;
show_latency = true;
break;
case 'j':
json = true;
break;
case 's':
status = optarg;
break;
case 'o':
output_file = optarg;
break;
case 'i':
RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
break;
default:
return false;
}
}
dsn::utils::multi_table_printer multi_printer;
if (!(status.empty() && output_file.empty())) {
dsn::utils::table_printer tp("parameters");
if (!status.empty()) {
tp.add_row_name_and_data("status", status);
}
if (!output_file.empty()) {
tp.add_row_name_and_data("out_file", output_file);
}
multi_printer.add(std::move(tp));
}
::dsn::replication::node_status::type s = ::dsn::replication::node_status::NS_INVALID;
if (!status.empty() && status != "all") {
s = type_from_string(dsn::replication::_node_status_VALUES_TO_NAMES,
std::string("ns_") + status,
::dsn::replication::node_status::NS_INVALID);
SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(s != ::dsn::replication::node_status::NS_INVALID,
"parse {} as node_status::type failed",
status);
}
std::map<dsn::host_port, dsn::replication::node_status::type> status_by_hp;
auto r = sc->ddl_client->list_nodes(s, status_by_hp);
if (r != dsn::ERR_OK) {
fmt::println("list nodes failed, error={}", r);
return true;
}
std::map<dsn::host_port, list_nodes_helper> tmp_map;
int alive_node_count = 0;
for (auto &kv : status_by_hp) {
if (kv.second == dsn::replication::node_status::NS_ALIVE) {
++alive_node_count;
}
const std::string status_str(dsn::enum_to_string(kv.second));
tmp_map.emplace(kv.first,
list_nodes_helper(replication_ddl_client::node_name(kv.first, resolve_ip),
status_str.substr(status_str.find("NS_") + 3)));
}
if (detailed) {
std::vector<::dsn::app_info> apps;
const auto &result = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
if (!result) {
fmt::println("list apps failed, error={}", result);
return true;
}
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<dsn::partition_configuration> pcs;
r = sc->ddl_client->list_app(app.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
fmt::println("list app {} failed, error={}", app.app_name, r);
return true;
}
for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
}
}
}
}
if (resource_usage) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &results = get_metrics(nodes, resource_usage_filters().to_query_string());
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "resource");
auto &stat = tmp_it->second;
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
parse_resource_usage(results[i].body(), stat), nodes[i], "parse resource usage");
}
}
if (show_qps) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &query_string = rw_requests_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results_start[i], nodes[i], "starting rw requests");
RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending rw requests");
list_nodes_helper &stat = tmp_it->second;
aggregate_stats_calcs calcs;
calcs.create_increases<total_aggregate_stats>(
"replica",
stat_var_map({{"read_capacity_units", &stat.read_cu},
{"write_capacity_units", &stat.write_cu}}));
calcs.create_rates<total_aggregate_stats>(
"replica",
stat_var_map({{"get_requests", &stat.get_qps},
{"multi_get_requests", &stat.multi_get_qps},
{"batch_get_requests", &stat.batch_get_qps},
{"put_requests", &stat.put_qps},
{"multi_put_requests", &stat.multi_put_qps}}));
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs.aggregate_metrics(results_start[i].body(), results_end[i].body()),
nodes[i],
"aggregate rw requests");
}
}
if (show_latency) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &results = get_metrics(nodes, profiler_latency_filters().to_query_string());
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler latency");
auto &stat = tmp_it->second;
RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_profiler_latency(results[i].body(), stat),
nodes[i],
"parse profiler latency");
}
}
dsn::utils::table_printer tp("details");
tp.add_title("address");
tp.add_column("status");
if (detailed) {
tp.add_column("replica_count", tp_alignment::kRight);
tp.add_column("primary_count", tp_alignment::kRight);
tp.add_column("secondary_count", tp_alignment::kRight);
}
if (resource_usage) {
tp.add_column("memused_res_mb", tp_alignment::kRight);
tp.add_column("block_cache_mb", tp_alignment::kRight);
tp.add_column("wbm_total_mb", tp_alignment::kRight);
tp.add_column("wbm_mutable_mb", tp_alignment::kRight);
tp.add_column("mem_tbl_mb", tp_alignment::kRight);
tp.add_column("mem_idx_mb", tp_alignment::kRight);
tp.add_column("disk_avl_total_ratio", tp_alignment::kRight);
tp.add_column("disk_avl_min_ratio", tp_alignment::kRight);
}
if (show_qps) {
tp.add_column("get_qps", tp_alignment::kRight);
tp.add_column("mget_qps", tp_alignment::kRight);
tp.add_column("bget_qps", tp_alignment::kRight);
tp.add_column("read_cu", tp_alignment::kRight);
tp.add_column("put_qps", tp_alignment::kRight);
tp.add_column("mput_qps", tp_alignment::kRight);
tp.add_column("write_cu", tp_alignment::kRight);
}
if (show_latency) {
tp.add_column("get_p99(ms)", tp_alignment::kRight);
tp.add_column("mget_p99(ms)", tp_alignment::kRight);
tp.add_column("bget_p99(ms)", tp_alignment::kRight);
tp.add_column("put_p99(ms)", tp_alignment::kRight);
tp.add_column("mput_p99(ms)", tp_alignment::kRight);
}
for (auto &kv : tmp_map) {
tp.add_row(kv.second.node_name);
tp.append_data(kv.second.node_status);
if (detailed) {
tp.append_data(kv.second.primary_count + kv.second.secondary_count);
tp.append_data(kv.second.primary_count);
tp.append_data(kv.second.secondary_count);
}
if (resource_usage) {
tp.append_data(kv.second.memused_res_mb);
tp.append_data(dsn::bytes::to_mb(kv.second.block_cache_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.wbm_total_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.wbm_mutable_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.mem_tbl_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.mem_idx_bytes));
tp.append_data(kv.second.disk_available_total_ratio);
tp.append_data(kv.second.disk_available_min_ratio);
}
if (show_qps) {
tp.append_data(kv.second.get_qps);
tp.append_data(kv.second.multi_get_qps);
tp.append_data(kv.second.batch_get_qps);
tp.append_data(kv.second.read_cu);
tp.append_data(kv.second.put_qps);
tp.append_data(kv.second.multi_put_qps);
tp.append_data(kv.second.write_cu);
}
if (show_latency) {
tp.append_data(kv.second.get_p99 / 1e6);
tp.append_data(kv.second.multi_get_p99 / 1e6);
tp.append_data(kv.second.batch_get_p99 / 1e6);
tp.append_data(kv.second.put_p99 / 1e6);
tp.append_data(kv.second.multi_put_p99 / 1e6);
}
}
multi_printer.add(std::move(tp));
dsn::utils::table_printer tp_count("summary");
tp_count.add_row_name_and_data("total_node_count", status_by_hp.size());
tp_count.add_row_name_and_data("alive_node_count", alive_node_count);
tp_count.add_row_name_and_data("unalive_node_count", status_by_hp.size() - alive_node_count);
multi_printer.add(std::move(tp_count));
dsn::utils::output(output_file, json, multi_printer);
return true;
}