Status RebalancerTool::PrintLocationBalanceStats()

in src/kudu/tools/rebalancer_tool.cc [493:694]


Status RebalancerTool::PrintLocationBalanceStats(const string& location,
                                                 const ClusterRawInfo& raw_info,
                                                 const ClusterInfo& ci,
                                                 ostream& out) const {
  if (!location.empty()) {
    out << "--------------------------------------------------" << endl;
    out << "Location: " << location << endl;
    out << "--------------------------------------------------" << endl;
  }

  // Build dictionary to resolve tablet server UUID into its RPC address.
  unordered_map<string, string> tserver_endpoints;
  {
    const auto& tserver_summaries = raw_info.tserver_summaries;
    for (const auto& summary : tserver_summaries) {
      tserver_endpoints.emplace(summary.uuid, summary.address);
    }
  }

  // Per-server replica distribution stats.
  {
    out << "Per-server replica distribution summary:" << endl;
    DataTable summary({"Statistic", "Value"});

    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
    if (servers_load_info.empty()) {
      summary.AddRow({ "N/A", "N/A" });
    } else {
      const int64_t total_replica_count = accumulate(
          servers_load_info.begin(), servers_load_info.end(), 0L,
          [](int64_t sum, const pair<int32_t, string>& elem) {
            return sum + elem.first;
          });

      const auto min_replica_count = servers_load_info.begin()->first;
      const auto max_replica_count = servers_load_info.rbegin()->first;
      const double avg_replica_count =
          1.0 * total_replica_count / servers_load_info.size();

      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
    }
    RETURN_NOT_OK(summary.PrintTo(out));
    out << endl;

    if (config_.output_replica_distribution_details) {
      out << "Per-server replica distribution details:" << endl;
      DataTable servers_info({ "UUID", "Address", "Replica Count" });
      for (const auto& [load, id] : servers_load_info) {
        servers_info.AddRow({ id, tserver_endpoints[id], to_string(load) });
      }
      RETURN_NOT_OK(servers_info.PrintTo(out));
      out << endl;
    }
  }

  // Per-table replica distribution stats.
  {
    out << "Per-table replica distribution summary:" << endl;
    DataTable summary({ "Replica Skew", "Value" });
    const auto& table_skew_info = ci.balance.table_info_by_skew;
    if (table_skew_info.empty()) {
      summary.AddRow({ "N/A", "N/A" });
    } else {
      const auto min_table_skew = table_skew_info.begin()->first;
      const auto max_table_skew = table_skew_info.rbegin()->first;
      const int64_t sum_table_skew = accumulate(
          table_skew_info.begin(), table_skew_info.end(), 0L,
          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
            return sum + elem.first;
          });
      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();

      summary.AddRow({ "Minimum", to_string(min_table_skew) });
      summary.AddRow({ "Maximum", to_string(max_table_skew) });
      summary.AddRow({ "Average", to_string(avg_table_skew) });
    }
    RETURN_NOT_OK(summary.PrintTo(out));
    out << endl;

    if (config_.output_replica_distribution_details) {
      const auto& table_summaries = raw_info.table_summaries;
      unordered_map<string, const TableSummary*> table_info;
      for (const auto& summary : table_summaries) {
        table_info.emplace(summary.id, &summary);
      }
      if (config_.enable_range_rebalancing) {
        out << "Per-range replica distribution details for tables" << endl;

        // Build mapping {table_id, tag} --> per-server replica count map.
        // Using ordered dictionary since it's targeted for printing later.
        map<pair<string, string>, map<string, size_t>> range_dist_stats;
        for (const auto& [_, balance_info] : table_skew_info) {
          const auto& table_id = balance_info.table_id;
          const auto& tag = balance_info.tag;
          if (!table_info[table_id]->is_range_partitioned) {
            continue;
          }
          auto it = range_dist_stats.emplace(
              std::make_pair(table_id, tag), map<string, size_t>{});
          const auto& server_info = balance_info.servers_by_replica_count;
          for (const auto& [count, server_uuid] : server_info) {
            auto count_it = it.first->second.emplace(server_uuid, 0).first;
            count_it->second += count;
          }
        }

        // Build the mapping for the per-range skew summary table, i.e.
        // {tablet_id, tag} --> {num_of_replicas, per_server_replica_skew}.
        map<pair<string, string>, pair<size_t, size_t>> range_skew_stats;
        for (const auto& [table_range, per_server_stats] : range_dist_stats) {
          size_t total_count = 0;
          size_t min_per_server_count = std::numeric_limits<size_t>::max();
          size_t max_per_server_count = std::numeric_limits<size_t>::min();
          for (const auto& [server_uuid, replica_count] : per_server_stats) {
            total_count += replica_count;
            if (replica_count > max_per_server_count) {
              max_per_server_count = replica_count;
            }
            if (replica_count < min_per_server_count) {
              min_per_server_count = replica_count;
            }
          }
          size_t skew = max_per_server_count - min_per_server_count;
          range_skew_stats.emplace(table_range, std::make_pair(total_count, skew));
        }

        string prev_table_id;
        for (const auto& [table_id_and_tag, per_server_stats] : range_dist_stats) {
          const auto& table_id = table_id_and_tag.first;
          const auto& table_range = table_id_and_tag.second;
          if (prev_table_id != table_id) {
            prev_table_id = table_id;
            out << endl
                << Substitute("Table: $0 ($1)", table_id, table_info[table_id]->name) << endl
                << endl;
            out << "Number of tablet replicas at servers for each range" << endl;
            DataTable range_skew_summary_table(
                { "Max Skew", "Total Count", "Range Start Key" });
            const auto it_begin = range_skew_stats.find(table_id_and_tag);
            for (auto it = it_begin; it != range_skew_stats.end(); ++it) {
              const auto& cur_table_id = it->first.first;
              if (cur_table_id != table_id) {
                break;
              }
              const auto& range = it->first.second;
              const auto replica_count = it->second.first;
              const auto replica_skew = it->second.second;
              range_skew_summary_table.AddRow(
                  { to_string(replica_skew), to_string(replica_count), range });
            }
            RETURN_NOT_OK(range_skew_summary_table.PrintTo(out));
            out << endl;
          }
          out << "Range start key: '" << table_range << "'" << endl;
          DataTable skew_table({ "UUID", "Server address", "Replica Count" });
          for (const auto& stat : per_server_stats) {
            const auto& srv_uuid = stat.first;
            const auto& srv_address = FindOrDie(tserver_endpoints, srv_uuid);
            skew_table.AddRow({ srv_uuid, srv_address, to_string(stat.second) });
          }
          RETURN_NOT_OK(skew_table.PrintTo(out));
          out << endl;
        }
      }

      const auto has_non_range_partitioned_table =
          std::any_of(table_info.begin(), table_info.end(), [&](const auto& info) {
            return !info.second->is_range_partitioned;
          });
      if (!config_.enable_range_rebalancing || has_non_range_partitioned_table) {
        out << "Per-table replica distribution details for "
            << (config_.enable_range_rebalancing ? "non range partitioned tables:" : "tables:")
            << endl;
        DataTable skew_table(
            { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
        for (const auto& [skew, balance_info] : table_skew_info) {
          const auto& table_id = balance_info.table_id;
          if (config_.enable_range_rebalancing && table_info[table_id]->is_range_partitioned) {
            continue;
          }
          const auto it = table_info.find(table_id);
          const auto* table_summary =
              (it == table_info.end()) ? nullptr : it->second;
          const auto& table_name = table_summary ? table_summary->name : "";
          const auto total_replica_count = table_summary
              ? table_summary->replication_factor * table_summary->TotalTablets()
              : 0;
          skew_table.AddRow({ table_id,
                              to_string(total_replica_count),
                              to_string(skew),
                              table_name });
        }
        RETURN_NOT_OK(skew_table.PrintTo(out));
      }
      out << endl;
    }
  }

  return Status::OK();
}