in src/kudu/tools/rebalancer_tool.cc [493:694]
Status RebalancerTool::PrintLocationBalanceStats(const string& location,
const ClusterRawInfo& raw_info,
const ClusterInfo& ci,
ostream& out) const {
if (!location.empty()) {
out << "--------------------------------------------------" << endl;
out << "Location: " << location << endl;
out << "--------------------------------------------------" << endl;
}
// Build dictionary to resolve tablet server UUID into its RPC address.
unordered_map<string, string> tserver_endpoints;
{
const auto& tserver_summaries = raw_info.tserver_summaries;
for (const auto& summary : tserver_summaries) {
tserver_endpoints.emplace(summary.uuid, summary.address);
}
}
// Per-server replica distribution stats.
{
out << "Per-server replica distribution summary:" << endl;
DataTable summary({"Statistic", "Value"});
const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
if (servers_load_info.empty()) {
summary.AddRow({ "N/A", "N/A" });
} else {
const int64_t total_replica_count = accumulate(
servers_load_info.begin(), servers_load_info.end(), 0L,
[](int64_t sum, const pair<int32_t, string>& elem) {
return sum + elem.first;
});
const auto min_replica_count = servers_load_info.begin()->first;
const auto max_replica_count = servers_load_info.rbegin()->first;
const double avg_replica_count =
1.0 * total_replica_count / servers_load_info.size();
summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
}
RETURN_NOT_OK(summary.PrintTo(out));
out << endl;
if (config_.output_replica_distribution_details) {
out << "Per-server replica distribution details:" << endl;
DataTable servers_info({ "UUID", "Address", "Replica Count" });
for (const auto& [load, id] : servers_load_info) {
servers_info.AddRow({ id, tserver_endpoints[id], to_string(load) });
}
RETURN_NOT_OK(servers_info.PrintTo(out));
out << endl;
}
}
// Per-table replica distribution stats.
{
out << "Per-table replica distribution summary:" << endl;
DataTable summary({ "Replica Skew", "Value" });
const auto& table_skew_info = ci.balance.table_info_by_skew;
if (table_skew_info.empty()) {
summary.AddRow({ "N/A", "N/A" });
} else {
const auto min_table_skew = table_skew_info.begin()->first;
const auto max_table_skew = table_skew_info.rbegin()->first;
const int64_t sum_table_skew = accumulate(
table_skew_info.begin(), table_skew_info.end(), 0L,
[](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
return sum + elem.first;
});
double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
summary.AddRow({ "Minimum", to_string(min_table_skew) });
summary.AddRow({ "Maximum", to_string(max_table_skew) });
summary.AddRow({ "Average", to_string(avg_table_skew) });
}
RETURN_NOT_OK(summary.PrintTo(out));
out << endl;
if (config_.output_replica_distribution_details) {
const auto& table_summaries = raw_info.table_summaries;
unordered_map<string, const TableSummary*> table_info;
for (const auto& summary : table_summaries) {
table_info.emplace(summary.id, &summary);
}
if (config_.enable_range_rebalancing) {
out << "Per-range replica distribution details for tables" << endl;
// Build mapping {table_id, tag} --> per-server replica count map.
// Using ordered dictionary since it's targeted for printing later.
map<pair<string, string>, map<string, size_t>> range_dist_stats;
for (const auto& [_, balance_info] : table_skew_info) {
const auto& table_id = balance_info.table_id;
const auto& tag = balance_info.tag;
if (!table_info[table_id]->is_range_partitioned) {
continue;
}
auto it = range_dist_stats.emplace(
std::make_pair(table_id, tag), map<string, size_t>{});
const auto& server_info = balance_info.servers_by_replica_count;
for (const auto& [count, server_uuid] : server_info) {
auto count_it = it.first->second.emplace(server_uuid, 0).first;
count_it->second += count;
}
}
// Build the mapping for the per-range skew summary table, i.e.
// {tablet_id, tag} --> {num_of_replicas, per_server_replica_skew}.
map<pair<string, string>, pair<size_t, size_t>> range_skew_stats;
for (const auto& [table_range, per_server_stats] : range_dist_stats) {
size_t total_count = 0;
size_t min_per_server_count = std::numeric_limits<size_t>::max();
size_t max_per_server_count = std::numeric_limits<size_t>::min();
for (const auto& [server_uuid, replica_count] : per_server_stats) {
total_count += replica_count;
if (replica_count > max_per_server_count) {
max_per_server_count = replica_count;
}
if (replica_count < min_per_server_count) {
min_per_server_count = replica_count;
}
}
size_t skew = max_per_server_count - min_per_server_count;
range_skew_stats.emplace(table_range, std::make_pair(total_count, skew));
}
string prev_table_id;
for (const auto& [table_id_and_tag, per_server_stats] : range_dist_stats) {
const auto& table_id = table_id_and_tag.first;
const auto& table_range = table_id_and_tag.second;
if (prev_table_id != table_id) {
prev_table_id = table_id;
out << endl
<< Substitute("Table: $0 ($1)", table_id, table_info[table_id]->name) << endl
<< endl;
out << "Number of tablet replicas at servers for each range" << endl;
DataTable range_skew_summary_table(
{ "Max Skew", "Total Count", "Range Start Key" });
const auto it_begin = range_skew_stats.find(table_id_and_tag);
for (auto it = it_begin; it != range_skew_stats.end(); ++it) {
const auto& cur_table_id = it->first.first;
if (cur_table_id != table_id) {
break;
}
const auto& range = it->first.second;
const auto replica_count = it->second.first;
const auto replica_skew = it->second.second;
range_skew_summary_table.AddRow(
{ to_string(replica_skew), to_string(replica_count), range });
}
RETURN_NOT_OK(range_skew_summary_table.PrintTo(out));
out << endl;
}
out << "Range start key: '" << table_range << "'" << endl;
DataTable skew_table({ "UUID", "Server address", "Replica Count" });
for (const auto& stat : per_server_stats) {
const auto& srv_uuid = stat.first;
const auto& srv_address = FindOrDie(tserver_endpoints, srv_uuid);
skew_table.AddRow({ srv_uuid, srv_address, to_string(stat.second) });
}
RETURN_NOT_OK(skew_table.PrintTo(out));
out << endl;
}
}
const auto has_non_range_partitioned_table =
std::any_of(table_info.begin(), table_info.end(), [&](const auto& info) {
return !info.second->is_range_partitioned;
});
if (!config_.enable_range_rebalancing || has_non_range_partitioned_table) {
out << "Per-table replica distribution details for "
<< (config_.enable_range_rebalancing ? "non range partitioned tables:" : "tables:")
<< endl;
DataTable skew_table(
{ "Table Id", "Replica Count", "Replica Skew", "Table Name" });
for (const auto& [skew, balance_info] : table_skew_info) {
const auto& table_id = balance_info.table_id;
if (config_.enable_range_rebalancing && table_info[table_id]->is_range_partitioned) {
continue;
}
const auto it = table_info.find(table_id);
const auto* table_summary =
(it == table_info.end()) ? nullptr : it->second;
const auto& table_name = table_summary ? table_summary->name : "";
const auto total_replica_count = table_summary
? table_summary->replication_factor * table_summary->TotalTablets()
: 0;
skew_table.AddRow({ table_id,
to_string(total_replica_count),
to_string(skew),
table_name });
}
RETURN_NOT_OK(skew_table.PrintTo(out));
}
out << endl;
}
}
return Status::OK();
}