bool ls_nodes()

in src/shell/commands/node_management.cpp [428:738]


bool ls_nodes(command_executor *, shell_context *sc, arguments args)
{
    static struct option long_options[] = {{"detailed", no_argument, nullptr, 'd'},
                                           {"resolve_ip", no_argument, nullptr, 'r'},
                                           {"resource_usage", no_argument, nullptr, 'u'},
                                           {"qps", no_argument, nullptr, 'q'},
                                           {"json", no_argument, nullptr, 'j'},
                                           {"status", required_argument, nullptr, 's'},
                                           {"output", required_argument, nullptr, 'o'},
                                           {"sample_interval_ms", required_argument, nullptr, 'i'},
                                           {nullptr, 0, nullptr, 0}};

    std::string status;
    std::string output_file;
    uint32_t sample_interval_ms = FLAGS_nodes_sample_interval_ms;
    bool detailed = false;
    bool resolve_ip = false;
    bool resource_usage = false;
    bool show_qps = false;
    bool show_latency = false;
    bool json = false;

    optind = 0;
    while (true) {
        int option_index = 0;
        // TODO(wangdan): getopt_long() is not thread-safe (clang-tidy[concurrency-mt-unsafe]),
        // could use https://github.com/p-ranav/argparse instead.
        int c = getopt_long(args.argc, args.argv, "druqjs:o:i:", long_options, &option_index);
        if (c == -1) {
            // -1 means all command-line options have been parsed.
            break;
        }

        switch (c) {
        case 'd':
            detailed = true;
            break;
        case 'r':
            resolve_ip = true;
            break;
        case 'u':
            resource_usage = true;
            break;
        case 'q':
            show_qps = true;
            show_latency = true;
            break;
        case 'j':
            json = true;
            break;
        case 's':
            status = optarg;
            break;
        case 'o':
            output_file = optarg;
            break;
        case 'i':
            RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
            break;
        default:
            return false;
        }
    }

    dsn::utils::multi_table_printer multi_printer;
    if (!(status.empty() && output_file.empty())) {
        dsn::utils::table_printer tp("parameters");
        if (!status.empty()) {
            tp.add_row_name_and_data("status", status);
        }
        if (!output_file.empty()) {
            tp.add_row_name_and_data("out_file", output_file);
        }
        multi_printer.add(std::move(tp));
    }

    ::dsn::replication::node_status::type s = ::dsn::replication::node_status::NS_INVALID;
    if (!status.empty() && status != "all") {
        s = type_from_string(dsn::replication::_node_status_VALUES_TO_NAMES,
                             std::string("ns_") + status,
                             ::dsn::replication::node_status::NS_INVALID);
        SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(s != ::dsn::replication::node_status::NS_INVALID,
                                            "parse {} as node_status::type failed",
                                            status);
    }

    std::map<dsn::host_port, dsn::replication::node_status::type> status_by_hp;
    auto r = sc->ddl_client->list_nodes(s, status_by_hp);
    if (r != dsn::ERR_OK) {
        fmt::println("list nodes failed, error={}", r);
        return true;
    }

    std::map<dsn::host_port, list_nodes_helper> tmp_map;
    int alive_node_count = 0;
    for (auto &kv : status_by_hp) {
        if (kv.second == dsn::replication::node_status::NS_ALIVE) {
            ++alive_node_count;
        }

        const std::string status_str(dsn::enum_to_string(kv.second));
        tmp_map.emplace(kv.first,
                        list_nodes_helper(replication_ddl_client::node_name(kv.first, resolve_ip),
                                          status_str.substr(status_str.find("NS_") + 3)));
    }

    if (detailed) {
        std::vector<::dsn::app_info> apps;
        const auto &result = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
        if (!result) {
            fmt::println("list apps failed, error={}", result);
            return true;
        }

        for (auto &app : apps) {
            int32_t app_id;
            int32_t partition_count;
            std::vector<dsn::partition_configuration> pcs;
            r = sc->ddl_client->list_app(app.app_name, app_id, partition_count, pcs);
            if (r != dsn::ERR_OK) {
                fmt::println("list app {} failed, error={}", app.app_name, r);
                return true;
            }

            for (const auto &pc : pcs) {
                if (pc.hp_primary) {
                    auto find = tmp_map.find(pc.hp_primary);
                    if (find != tmp_map.end()) {
                        find->second.primary_count++;
                    }
                }
                for (const auto &secondary : pc.hp_secondaries) {
                    auto find = tmp_map.find(secondary);
                    if (find != tmp_map.end()) {
                        find->second.secondary_count++;
                    }
                }
            }
        }
    }

    if (resource_usage) {
        std::vector<node_desc> nodes;
        if (!fill_nodes(sc, "replica-server", nodes)) {
            fmt::println("get replica server node list failed");
            return true;
        }

        const auto &results = get_metrics(nodes, resource_usage_filters().to_query_string());

        for (size_t i = 0; i < nodes.size(); ++i) {
            auto tmp_it = tmp_map.find(nodes[i].hp);
            if (tmp_it == tmp_map.end()) {
                continue;
            }

            RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "resource");

            auto &stat = tmp_it->second;
            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
                parse_resource_usage(results[i].body(), stat), nodes[i], "parse resource usage");
        }
    }

    if (show_qps) {
        std::vector<node_desc> nodes;
        if (!fill_nodes(sc, "replica-server", nodes)) {
            fmt::println("get replica server node list failed");
            return true;
        }

        const auto &query_string = rw_requests_filters().to_query_string();
        const auto &results_start = get_metrics(nodes, query_string);
        std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
        const auto &results_end = get_metrics(nodes, query_string);

        for (size_t i = 0; i < nodes.size(); ++i) {
            auto tmp_it = tmp_map.find(nodes[i].hp);
            if (tmp_it == tmp_map.end()) {
                continue;
            }

            RETURN_SHELL_IF_GET_METRICS_FAILED(results_start[i], nodes[i], "starting rw requests");
            RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending rw requests");

            list_nodes_helper &stat = tmp_it->second;
            aggregate_stats_calcs calcs;
            calcs.create_increases<total_aggregate_stats>(
                "replica",
                stat_var_map({{"read_capacity_units", &stat.read_cu},
                              {"write_capacity_units", &stat.write_cu}}));
            calcs.create_rates<total_aggregate_stats>(
                "replica",
                stat_var_map({{"get_requests", &stat.get_qps},
                              {"multi_get_requests", &stat.multi_get_qps},
                              {"batch_get_requests", &stat.batch_get_qps},
                              {"put_requests", &stat.put_qps},
                              {"multi_put_requests", &stat.multi_put_qps}}));

            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
                calcs.aggregate_metrics(results_start[i].body(), results_end[i].body()),
                nodes[i],
                "aggregate rw requests");
        }
    }

    if (show_latency) {
        std::vector<node_desc> nodes;
        if (!fill_nodes(sc, "replica-server", nodes)) {
            fmt::println("get replica server node list failed");
            return true;
        }

        const auto &results = get_metrics(nodes, profiler_latency_filters().to_query_string());

        for (size_t i = 0; i < nodes.size(); ++i) {
            auto tmp_it = tmp_map.find(nodes[i].hp);
            if (tmp_it == tmp_map.end()) {
                continue;
            }

            RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler latency");

            auto &stat = tmp_it->second;
            RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_profiler_latency(results[i].body(), stat),
                                                 nodes[i],
                                                 "parse profiler latency");
        }
    }

    dsn::utils::table_printer tp("details");
    tp.add_title("address");
    tp.add_column("status");
    if (detailed) {
        tp.add_column("replica_count", tp_alignment::kRight);
        tp.add_column("primary_count", tp_alignment::kRight);
        tp.add_column("secondary_count", tp_alignment::kRight);
    }
    if (resource_usage) {
        tp.add_column("memused_res_mb", tp_alignment::kRight);
        tp.add_column("block_cache_mb", tp_alignment::kRight);
        tp.add_column("wbm_total_mb", tp_alignment::kRight);
        tp.add_column("wbm_mutable_mb", tp_alignment::kRight);
        tp.add_column("mem_tbl_mb", tp_alignment::kRight);
        tp.add_column("mem_idx_mb", tp_alignment::kRight);
        tp.add_column("disk_avl_total_ratio", tp_alignment::kRight);
        tp.add_column("disk_avl_min_ratio", tp_alignment::kRight);
    }
    if (show_qps) {
        tp.add_column("get_qps", tp_alignment::kRight);
        tp.add_column("mget_qps", tp_alignment::kRight);
        tp.add_column("bget_qps", tp_alignment::kRight);
        tp.add_column("read_cu", tp_alignment::kRight);
        tp.add_column("put_qps", tp_alignment::kRight);
        tp.add_column("mput_qps", tp_alignment::kRight);
        tp.add_column("write_cu", tp_alignment::kRight);
    }
    if (show_latency) {
        tp.add_column("get_p99(ms)", tp_alignment::kRight);
        tp.add_column("mget_p99(ms)", tp_alignment::kRight);
        tp.add_column("bget_p99(ms)", tp_alignment::kRight);
        tp.add_column("put_p99(ms)", tp_alignment::kRight);
        tp.add_column("mput_p99(ms)", tp_alignment::kRight);
    }
    for (auto &kv : tmp_map) {
        tp.add_row(kv.second.node_name);
        tp.append_data(kv.second.node_status);
        if (detailed) {
            tp.append_data(kv.second.primary_count + kv.second.secondary_count);
            tp.append_data(kv.second.primary_count);
            tp.append_data(kv.second.secondary_count);
        }
        if (resource_usage) {
            tp.append_data(kv.second.memused_res_mb);
            tp.append_data(dsn::bytes::to_mb(kv.second.block_cache_bytes));
            tp.append_data(dsn::bytes::to_mb(kv.second.wbm_total_bytes));
            tp.append_data(dsn::bytes::to_mb(kv.second.wbm_mutable_bytes));
            tp.append_data(dsn::bytes::to_mb(kv.second.mem_tbl_bytes));
            tp.append_data(dsn::bytes::to_mb(kv.second.mem_idx_bytes));
            tp.append_data(kv.second.disk_available_total_ratio);
            tp.append_data(kv.second.disk_available_min_ratio);
        }
        if (show_qps) {
            tp.append_data(kv.second.get_qps);
            tp.append_data(kv.second.multi_get_qps);
            tp.append_data(kv.second.batch_get_qps);
            tp.append_data(kv.second.read_cu);
            tp.append_data(kv.second.put_qps);
            tp.append_data(kv.second.multi_put_qps);
            tp.append_data(kv.second.write_cu);
        }
        if (show_latency) {
            tp.append_data(kv.second.get_p99 / 1e6);
            tp.append_data(kv.second.multi_get_p99 / 1e6);
            tp.append_data(kv.second.batch_get_p99 / 1e6);
            tp.append_data(kv.second.put_p99 / 1e6);
            tp.append_data(kv.second.multi_put_p99 / 1e6);
        }
    }
    multi_printer.add(std::move(tp));

    dsn::utils::table_printer tp_count("summary");
    tp_count.add_row_name_and_data("total_node_count", status_by_hp.size());
    tp_count.add_row_name_and_data("alive_node_count", alive_node_count);
    tp_count.add_row_name_and_data("unalive_node_count", status_by_hp.size() - alive_node_count);
    multi_printer.add(std::move(tp_count));

    dsn::utils::output(output_file, json, multi_printer);

    return true;
}