src/shell/commands/node_management.cpp (738 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <fmt/core.h>
#include <fmt/format.h>
#include <getopt.h>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
#include <stdint.h>
#include <stdio.h>
#include <algorithm>
// IWYU pragma: no_include <bits/getopt_core.h>
#include <chrono>
#include <initializer_list>
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "client/replication_ddl_client.h"
#include "common/json_helper.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
#include "rpc/rpc_host_port.h"
#include "shell/argh.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "utils/blob.h"
#include "utils/bytes.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/math.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/ports.h"
DSN_DEFINE_uint32(shell, nodes_sample_interval_ms, 1000, "The interval between sampling metrics.");
DSN_DEFINE_validator(nodes_sample_interval_ms, [](uint32_t value) -> bool { return value > 0; });
bool query_cluster_info(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"resolve_ip", no_argument, 0, 'r'},
{"json", no_argument, 0, 'j'},
{"output", required_argument, 0, 'o'},
{0, 0, 0, 0}};
std::string out_file;
bool resolve_ip = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c = getopt_long(args.argc, args.argv, "rjo:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'r':
resolve_ip = true;
break;
case 'j':
json = true;
break;
case 'o':
out_file = optarg;
break;
default:
return false;
}
}
::dsn::error_code err = sc->ddl_client->cluster_info(out_file, resolve_ip, json);
if (err != ::dsn::ERR_OK) {
std::cout << "get cluster info failed, error=" << err << std::endl;
}
return true;
}
namespace {
dsn::metric_filters resource_usage_filters()
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"server", "replica", "disk"};
filters.entity_metrics = {"resident_mem_usage_mb",
"rdb_block_cache_mem_usage_bytes",
"rdb_wbm_total_mem_usage_bytes",
"rdb_wbm_mutable_mem_usage_bytes",
"rdb_memtable_mem_usage_bytes",
"rdb_index_and_filter_blocks_mem_usage_bytes",
"disk_capacity_total_mb",
"disk_capacity_avail_mb"};
return filters;
}
dsn::error_s parse_resource_usage(const std::string &json_string, list_nodes_helper &stat)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot);
int64_t total_capacity_mb = 0;
int64_t total_available_mb = 0;
stat.disk_available_min_ratio = 100;
for (const auto &entity : query_snapshot.entities) {
if (entity.type == "server") {
for (const auto &m : entity.metrics) {
if (m.name == "resident_mem_usage_mb") {
stat.memused_res_mb += static_cast<int64_t>(m.value);
} else if (m.name == "rdb_block_cache_mem_usage_bytes") {
stat.block_cache_bytes += static_cast<int64_t>(m.value);
} else if (m.name == "rdb_wbm_total_mem_usage_bytes") {
stat.wbm_total_bytes += static_cast<int64_t>(m.value);
} else if (m.name == "rdb_wbm_mutable_mem_usage_bytes") {
stat.wbm_mutable_bytes += static_cast<int64_t>(m.value);
}
}
} else if (entity.type == "replica") {
for (const auto &m : entity.metrics) {
if (m.name == "rdb_memtable_mem_usage_bytes") {
stat.mem_tbl_bytes += static_cast<int64_t>(m.value);
} else if (m.name == "rdb_index_and_filter_blocks_mem_usage_bytes") {
stat.mem_idx_bytes += static_cast<int64_t>(m.value);
}
}
} else if (entity.type == "disk") {
int64_t capacity_mb = 0;
int64_t available_mb = 0;
for (const auto &m : entity.metrics) {
if (m.name == "disk_capacity_total_mb") {
total_capacity_mb += static_cast<int64_t>(m.value);
capacity_mb = static_cast<int64_t>(m.value);
} else if (m.name == "disk_capacity_avail_mb") {
total_available_mb += static_cast<int64_t>(m.value);
available_mb = static_cast<int64_t>(m.value);
}
}
const auto available_ratio = dsn::utils::calc_percentage(available_mb, capacity_mb);
stat.disk_available_min_ratio =
std::min(stat.disk_available_min_ratio, available_ratio);
}
}
stat.disk_available_total_ratio =
dsn::utils::calc_percentage(total_available_mb, total_capacity_mb);
return dsn::error_s::ok();
}
dsn::metric_filters profiler_latency_filters()
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kth_percentile_to_name(dsn::kth_percentile_type::P99)};
filters.entity_types = {"profiler"};
filters.entity_metrics = {"profiler_server_rpc_latency_ns"};
return filters;
}
dsn::error_s parse_profiler_latency(const std::string &json_string, list_nodes_helper &stat)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot);
for (const auto &entity : query_snapshot.entities) {
if (dsn_unlikely(entity.type != "profiler")) {
return FMT_ERR(dsn::ERR_INVALID_DATA,
"non-replica entity should not be included: {}",
entity.type);
}
const auto &t = entity.attributes.find("task_name");
if (dsn_unlikely(t == entity.attributes.end())) {
return FMT_ERR(dsn::ERR_INVALID_DATA, "task_name field was not found");
}
double *latency = nullptr;
const auto &task_name = t->second;
if (task_name == "RPC_RRDB_RRDB_GET") {
latency = &stat.get_p99;
} else if (task_name == "RPC_RRDB_RRDB_PUT") {
latency = &stat.put_p99;
} else if (task_name == "RPC_RRDB_RRDB_MULTI_GET") {
latency = &stat.multi_get_p99;
} else if (task_name == "RPC_RRDB_RRDB_MULTI_PUT") {
latency = &stat.multi_put_p99;
} else if (task_name == "RPC_RRDB_RRDB_BATCH_GET") {
latency = &stat.batch_get_p99;
} else {
continue;
}
for (const auto &m : entity.metrics) {
if (m.name == "profiler_server_rpc_latency_ns") {
*latency = m.p99;
}
}
}
return dsn::error_s::ok();
}
dsn::metric_filters rw_requests_filters()
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"replica"};
filters.entity_metrics = {"get_requests",
"multi_get_requests",
"batch_get_requests",
"put_requests",
"multi_put_requests",
"read_capacity_units",
"write_capacity_units"};
return filters;
}
dsn::metric_filters server_stat_filters()
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"server"};
filters.entity_metrics = {"virtual_mem_usage_mb", "resident_mem_usage_mb"};
return filters;
}
struct meta_server_stats
{
meta_server_stats() = default;
double virt_mem_mb{0.0};
double res_mem_mb{0.0};
DEFINE_JSON_SERIALIZATION(virt_mem_mb, res_mem_mb)
};
std::pair<bool, std::string>
aggregate_meta_server_stats(const node_desc &node,
const dsn::metric_query_brief_value_snapshot &query_snapshot)
{
aggregate_stats_calcs calcs;
meta_server_stats stats;
calcs.create_assignments<total_aggregate_stats>(
"server",
stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
{"resident_mem_usage_mb", &stats.res_mem_mb}}));
const auto command_result = process_parse_metrics_result(
calcs.aggregate_metrics(query_snapshot), node, "aggregate meta server stats");
if (!command_result) {
// Metrics failed to be aggregated.
return std::make_pair(false, command_result.description());
}
return std::make_pair(true,
dsn::json::json_forwarder<meta_server_stats>::encode(stats).to_string());
}
struct replica_server_stats
{
replica_server_stats() = default;
double virt_mem_mb{0.0};
double res_mem_mb{0.0};
double total_replicas{0.0};
double opening_replicas{0.0};
double closing_replicas{0.0};
double inactive_replicas{0.0};
double error_replicas{0.0};
double primary_replicas{0.0};
double secondary_replicas{0.0};
double learning_replicas{0.0};
double splitting_replicas{0.0};
DEFINE_JSON_SERIALIZATION(virt_mem_mb,
res_mem_mb,
total_replicas,
opening_replicas,
closing_replicas,
inactive_replicas,
error_replicas,
primary_replicas,
secondary_replicas,
learning_replicas,
splitting_replicas)
};
std::pair<bool, std::string>
aggregate_replica_server_stats(const node_desc &node,
const dsn::metric_query_brief_value_snapshot &query_snapshot_start,
const dsn::metric_query_brief_value_snapshot &query_snapshot_end)
{
aggregate_stats_calcs calcs;
replica_server_stats stats;
calcs.create_assignments<total_aggregate_stats>(
"server",
stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
{"resident_mem_usage_mb", &stats.res_mem_mb},
{"total_replicas", &stats.total_replicas},
{"opening_replicas", &stats.opening_replicas},
{"closing_replicas", &stats.closing_replicas},
{"inactive_replicas", &stats.inactive_replicas},
{"error_replicas", &stats.error_replicas},
{"primary_replicas", &stats.primary_replicas},
{"secondary_replicas", &stats.secondary_replicas},
{"learning_replicas", &stats.learning_replicas},
{"splitting_replicas", &stats.splitting_replicas}}));
const auto command_result = process_parse_metrics_result(
calcs.aggregate_metrics(query_snapshot_start, query_snapshot_end),
node,
"aggregate replica server stats");
if (!command_result) {
// Metrics failed to be aggregated.
return std::make_pair(false, command_result.description());
}
return std::make_pair(
true, dsn::json::json_forwarder<replica_server_stats>::encode(stats).to_string());
}
std::vector<std::pair<bool, std::string>> get_server_stats(const std::vector<node_desc> &nodes,
uint32_t sample_interval_ms)
{
// Ask target node (meta or replica server) for the metrics of server stats.
const auto &query_string = server_stat_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
std::vector<std::pair<bool, std::string>> command_results;
command_results.reserve(nodes.size());
for (size_t i = 0; i < nodes.size(); ++i) {
#define SKIP_IF_PROCESS_RESULT_FALSE() \
if (!command_result) { \
command_results.emplace_back(command_result, command_result.description()); \
continue; \
}
#define PROCESS_GET_METRICS_RESULT(result, what, ...) \
{ \
auto command_result = process_get_metrics_result(result, nodes[i], what, ##__VA_ARGS__); \
SKIP_IF_PROCESS_RESULT_FALSE() \
}
// Skip the metrics that failed to be fetched.
PROCESS_GET_METRICS_RESULT(results_start[i], "starting server stats")
PROCESS_GET_METRICS_RESULT(results_end[i], "ending server stats")
#undef PROCESS_GET_METRICS_RESULT
dsn::metric_query_brief_value_snapshot query_snapshot_start;
dsn::metric_query_brief_value_snapshot query_snapshot_end;
{
// Skip the metrics that failed to be deserialized.
auto command_result = process_parse_metrics_result(
deserialize_metric_query_2_samples(results_start[i].body(),
results_end[i].body(),
query_snapshot_start,
query_snapshot_end),
nodes[i],
"deserialize server stats");
SKIP_IF_PROCESS_RESULT_FALSE()
}
#undef SKIP_IF_PROCESS_RESULT_FALSE
if (query_snapshot_end.role == "meta") {
command_results.push_back(aggregate_meta_server_stats(nodes[i], query_snapshot_end));
continue;
}
if (query_snapshot_end.role == "replica") {
command_results.push_back(
aggregate_replica_server_stats(nodes[i], query_snapshot_start, query_snapshot_end));
continue;
}
command_results.emplace_back(
false, fmt::format("role {} is unsupported", query_snapshot_end.role));
}
return command_results;
}
std::vector<std::pair<bool, std::string>> call_nodes(shell_context *sc,
const std::vector<node_desc> &nodes,
const std::string &command,
const std::vector<std::string> &arguments,
uint32_t sample_interval_ms)
{
if (command == "server_stat") {
return get_server_stats(nodes, sample_interval_ms);
}
return call_remote_command(sc, nodes, command, arguments);
}
} // anonymous namespace
bool ls_nodes(command_executor *, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"detailed", no_argument, nullptr, 'd'},
{"resolve_ip", no_argument, nullptr, 'r'},
{"resource_usage", no_argument, nullptr, 'u'},
{"qps", no_argument, nullptr, 'q'},
{"json", no_argument, nullptr, 'j'},
{"status", required_argument, nullptr, 's'},
{"output", required_argument, nullptr, 'o'},
{"sample_interval_ms", required_argument, nullptr, 'i'},
{nullptr, 0, nullptr, 0}};
std::string status;
std::string output_file;
uint32_t sample_interval_ms = FLAGS_nodes_sample_interval_ms;
bool detailed = false;
bool resolve_ip = false;
bool resource_usage = false;
bool show_qps = false;
bool show_latency = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
// TODO(wangdan): getopt_long() is not thread-safe (clang-tidy[concurrency-mt-unsafe]),
// could use https://github.com/p-ranav/argparse instead.
int c = getopt_long(args.argc, args.argv, "druqjs:o:i:", long_options, &option_index);
if (c == -1) {
// -1 means all command-line options have been parsed.
break;
}
switch (c) {
case 'd':
detailed = true;
break;
case 'r':
resolve_ip = true;
break;
case 'u':
resource_usage = true;
break;
case 'q':
show_qps = true;
show_latency = true;
break;
case 'j':
json = true;
break;
case 's':
status = optarg;
break;
case 'o':
output_file = optarg;
break;
case 'i':
RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
break;
default:
return false;
}
}
dsn::utils::multi_table_printer multi_printer;
if (!(status.empty() && output_file.empty())) {
dsn::utils::table_printer tp("parameters");
if (!status.empty()) {
tp.add_row_name_and_data("status", status);
}
if (!output_file.empty()) {
tp.add_row_name_and_data("out_file", output_file);
}
multi_printer.add(std::move(tp));
}
::dsn::replication::node_status::type s = ::dsn::replication::node_status::NS_INVALID;
if (!status.empty() && status != "all") {
s = type_from_string(dsn::replication::_node_status_VALUES_TO_NAMES,
std::string("ns_") + status,
::dsn::replication::node_status::NS_INVALID);
SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(s != ::dsn::replication::node_status::NS_INVALID,
"parse {} as node_status::type failed",
status);
}
std::map<dsn::host_port, dsn::replication::node_status::type> status_by_hp;
auto r = sc->ddl_client->list_nodes(s, status_by_hp);
if (r != dsn::ERR_OK) {
fmt::println("list nodes failed, error={}", r);
return true;
}
std::map<dsn::host_port, list_nodes_helper> tmp_map;
int alive_node_count = 0;
for (auto &kv : status_by_hp) {
if (kv.second == dsn::replication::node_status::NS_ALIVE) {
++alive_node_count;
}
const std::string status_str(dsn::enum_to_string(kv.second));
tmp_map.emplace(kv.first,
list_nodes_helper(replication_ddl_client::node_name(kv.first, resolve_ip),
status_str.substr(status_str.find("NS_") + 3)));
}
if (detailed) {
std::vector<::dsn::app_info> apps;
const auto &result = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
if (!result) {
fmt::println("list apps failed, error={}", result);
return true;
}
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<dsn::partition_configuration> pcs;
r = sc->ddl_client->list_app(app.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
fmt::println("list app {} failed, error={}", app.app_name, r);
return true;
}
for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
}
}
}
}
if (resource_usage) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &results = get_metrics(nodes, resource_usage_filters().to_query_string());
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "resource");
auto &stat = tmp_it->second;
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
parse_resource_usage(results[i].body(), stat), nodes[i], "parse resource usage");
}
}
if (show_qps) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &query_string = rw_requests_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results_start[i], nodes[i], "starting rw requests");
RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending rw requests");
list_nodes_helper &stat = tmp_it->second;
aggregate_stats_calcs calcs;
calcs.create_increases<total_aggregate_stats>(
"replica",
stat_var_map({{"read_capacity_units", &stat.read_cu},
{"write_capacity_units", &stat.write_cu}}));
calcs.create_rates<total_aggregate_stats>(
"replica",
stat_var_map({{"get_requests", &stat.get_qps},
{"multi_get_requests", &stat.multi_get_qps},
{"batch_get_requests", &stat.batch_get_qps},
{"put_requests", &stat.put_qps},
{"multi_put_requests", &stat.multi_put_qps}}));
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs.aggregate_metrics(results_start[i].body(), results_end[i].body()),
nodes[i],
"aggregate rw requests");
}
}
if (show_latency) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
fmt::println("get replica server node list failed");
return true;
}
const auto &results = get_metrics(nodes, profiler_latency_filters().to_query_string());
for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].hp);
if (tmp_it == tmp_map.end()) {
continue;
}
RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler latency");
auto &stat = tmp_it->second;
RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_profiler_latency(results[i].body(), stat),
nodes[i],
"parse profiler latency");
}
}
dsn::utils::table_printer tp("details");
tp.add_title("address");
tp.add_column("status");
if (detailed) {
tp.add_column("replica_count", tp_alignment::kRight);
tp.add_column("primary_count", tp_alignment::kRight);
tp.add_column("secondary_count", tp_alignment::kRight);
}
if (resource_usage) {
tp.add_column("memused_res_mb", tp_alignment::kRight);
tp.add_column("block_cache_mb", tp_alignment::kRight);
tp.add_column("wbm_total_mb", tp_alignment::kRight);
tp.add_column("wbm_mutable_mb", tp_alignment::kRight);
tp.add_column("mem_tbl_mb", tp_alignment::kRight);
tp.add_column("mem_idx_mb", tp_alignment::kRight);
tp.add_column("disk_avl_total_ratio", tp_alignment::kRight);
tp.add_column("disk_avl_min_ratio", tp_alignment::kRight);
}
if (show_qps) {
tp.add_column("get_qps", tp_alignment::kRight);
tp.add_column("mget_qps", tp_alignment::kRight);
tp.add_column("bget_qps", tp_alignment::kRight);
tp.add_column("read_cu", tp_alignment::kRight);
tp.add_column("put_qps", tp_alignment::kRight);
tp.add_column("mput_qps", tp_alignment::kRight);
tp.add_column("write_cu", tp_alignment::kRight);
}
if (show_latency) {
tp.add_column("get_p99(ms)", tp_alignment::kRight);
tp.add_column("mget_p99(ms)", tp_alignment::kRight);
tp.add_column("bget_p99(ms)", tp_alignment::kRight);
tp.add_column("put_p99(ms)", tp_alignment::kRight);
tp.add_column("mput_p99(ms)", tp_alignment::kRight);
}
for (auto &kv : tmp_map) {
tp.add_row(kv.second.node_name);
tp.append_data(kv.second.node_status);
if (detailed) {
tp.append_data(kv.second.primary_count + kv.second.secondary_count);
tp.append_data(kv.second.primary_count);
tp.append_data(kv.second.secondary_count);
}
if (resource_usage) {
tp.append_data(kv.second.memused_res_mb);
tp.append_data(dsn::bytes::to_mb(kv.second.block_cache_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.wbm_total_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.wbm_mutable_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.mem_tbl_bytes));
tp.append_data(dsn::bytes::to_mb(kv.second.mem_idx_bytes));
tp.append_data(kv.second.disk_available_total_ratio);
tp.append_data(kv.second.disk_available_min_ratio);
}
if (show_qps) {
tp.append_data(kv.second.get_qps);
tp.append_data(kv.second.multi_get_qps);
tp.append_data(kv.second.batch_get_qps);
tp.append_data(kv.second.read_cu);
tp.append_data(kv.second.put_qps);
tp.append_data(kv.second.multi_put_qps);
tp.append_data(kv.second.write_cu);
}
if (show_latency) {
tp.append_data(kv.second.get_p99 / 1e6);
tp.append_data(kv.second.multi_get_p99 / 1e6);
tp.append_data(kv.second.batch_get_p99 / 1e6);
tp.append_data(kv.second.put_p99 / 1e6);
tp.append_data(kv.second.multi_put_p99 / 1e6);
}
}
multi_printer.add(std::move(tp));
dsn::utils::table_printer tp_count("summary");
tp_count.add_row_name_and_data("total_node_count", status_by_hp.size());
tp_count.add_row_name_and_data("alive_node_count", alive_node_count);
tp_count.add_row_name_and_data("unalive_node_count", status_by_hp.size() - alive_node_count);
multi_printer.add(std::move(tp_count));
dsn::utils::output(output_file, json, multi_printer);
return true;
}
bool server_info(command_executor *e, shell_context *sc, arguments args)
{
return remote_command(e, sc, args);
}
bool server_stat(command_executor *e, shell_context *sc, arguments args)
{
return remote_command(e, sc, args);
}
bool flush_log(command_executor *e, shell_context *sc, arguments args)
{
return remote_command(e, sc, args);
}
bool remote_command(command_executor *e, shell_context *sc, arguments args)
{
// Command format: [remote_command] <command> [arguments...]
// [-t all|meta-server|replica-server]
// [-r|--resolve_ip]
// [-l host:port,host:port...]
// [-i|--sample_interval_ms num]
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
std::string command;
std::vector<std::string> pos_args;
int pos = 0;
do {
// Try to parse the positional args.
const auto &pos_arg = cmd(pos++);
if (!pos_arg) {
break;
}
// Ignore the args that are useless to the command.
static const std::set<std::string> kIgnoreArgs({"remote_command"});
if (kIgnoreArgs.count(pos_arg.str()) == 1) {
continue;
}
// Collect the positional args following by the command.
if (!command.empty()) {
pos_args.emplace_back(pos_arg.str());
continue;
}
// Initialize the command.
const std::map<std::string, std::string> kCmdsMapping(
{{"server_info", "server-info"}, {"flush_log", "flush-log"}});
const auto &it = kCmdsMapping.find(pos_arg.str());
if (it != kCmdsMapping.end()) {
// Use the mapped command.
command = it->second;
} else {
command = pos_arg.str();
}
} while (true);
if (command.empty()) {
SHELL_PRINTLN_ERROR("missing <command>");
return false;
}
const auto resolve_ip = cmd[{"-r", "--resolve_ip"}];
auto node_type = cmd({"-t"}).str();
std::vector<std::string> nodes_str;
PARSE_OPT_STRS(nodes_str, "", {"-l"});
if (!node_type.empty() && !nodes_str.empty()) {
SHELL_PRINTLN_ERROR("can not specify both node_type and nodes_str");
return false;
}
if (node_type.empty() && nodes_str.empty()) {
node_type = "all";
}
static const std::set<std::string> kValidNodeTypes({"all", "meta-server", "replica-server"});
if (!node_type.empty() && kValidNodeTypes.count(node_type) == 0) {
SHELL_PRINTLN_ERROR("invalid node_type, should be in [{}]",
fmt::join(kValidNodeTypes, ", "));
return false;
}
std::vector<node_desc> nodes;
do {
if (node_type.empty()) {
for (const auto &node_str : nodes_str) {
const auto node = dsn::host_port::from_string(node_str);
if (!node) {
SHELL_PRINTLN_ERROR("parse '{}' as host:port failed", node_str);
return false;
}
nodes.emplace_back("user-specified", node);
}
break;
}
if (!fill_nodes(sc, node_type, nodes)) {
SHELL_PRINTLN_ERROR("prepare nodes failed, node_type = {}", node_type);
return false;
}
} while (false);
nlohmann::json info;
info["command"] = fmt::format("{} {}", command, fmt::join(pos_args, " "));
uint32_t sample_interval_ms = 0;
PARSE_OPT_UINT(
sample_interval_ms, FLAGS_nodes_sample_interval_ms, {"-i", "--sample_interval_ms"});
const auto &results = call_nodes(sc, nodes, command, pos_args, sample_interval_ms);
CHECK_EQ(results.size(), nodes.size());
int succeed = 0;
int failed = 0;
for (int i = 0; i < nodes.size(); ++i) {
nlohmann::json node_info;
node_info["role"] = nodes[i].desc;
node_info["acked"] = results[i].first;
try {
// Treat the message as a JSON object by default.
node_info["message"] = nlohmann::json::parse(results[i].second);
} catch (nlohmann::json::exception &exp) {
// Treat it as a string if failed to parse as a JSON object.
node_info["message"] = results[i].second;
}
if (results[i].first) {
succeed++;
} else {
failed++;
}
info["details"].emplace(replication_ddl_client::node_name(nodes[i].hp, resolve_ip),
node_info);
}
info["succeed_count"] = succeed;
info["failed_count"] = failed;
fmt::println(stdout, "{}", info.dump(2));
return true;
}