in src/kudu/tools/rebalancer.cc [315:451]
Status Rebalancer::KsckResultsToClusterBalanceInfo(
const KsckResults& ksck_info,
const MovesInProgress& pending_moves,
ClusterBalanceInfo* cbi) const {
DCHECK(cbi);
// tserver UUID --> total replica count of all table's tablets at the server
typedef unordered_map<string, int32_t> TableReplicasAtServer;
// The result table balance information to build.
ClusterBalanceInfo balance_info;
unordered_map<string, int32_t> tserver_replicas_count;
unordered_map<string, TableReplicasAtServer> table_replicas_info;
// Build a set of tables with RF=1 (single replica tables).
unordered_set<string> rf1_tables;
if (!config_.move_rf1_replicas) {
for (const auto& s : ksck_info.table_summaries) {
if (s.replication_factor == 1) {
rf1_tables.emplace(s.id);
}
}
}
for (const auto& s : ksck_info.tserver_summaries) {
if (s.health != KsckServerHealth::HEALTHY) {
LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
"non-HEALTHY status ($2)",
s.uuid, s.address,
ServerHealthToString(s.health));
continue;
}
tserver_replicas_count.emplace(s.uuid, 0);
}
for (const auto& tablet : ksck_info.tablet_summaries) {
if (!config_.move_rf1_replicas) {
if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
LOG(INFO) << Substitute("tablet $0 of table '$0' ($1) has single replica, skipping",
tablet.id, tablet.table_name, tablet.table_id);
continue;
}
}
// Check if it's one of the tablets which are currently being rebalanced.
// If so, interpret the move as successfully completed, updating the
// replica counts correspondingly.
const auto it_pending_moves = pending_moves.find(tablet.id);
for (const auto& ri : tablet.replicas) {
// Increment total count of replicas at the tablet server.
auto it = tserver_replicas_count.find(ri.ts_uuid);
if (it == tserver_replicas_count.end()) {
string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
if (ri.ts_address) {
msg += " (" + *ri.ts_address + ")";
}
msg += " since it's not reported among known tservers";
LOG(INFO) << msg;
continue;
}
bool do_count_replica = true;
if (it_pending_moves != pending_moves.end() &&
tablet.result == KsckCheckResult::RECOVERING) {
const auto& move_info = it_pending_moves->second;
bool is_target_replica_present = false;
// Verify that the target replica is present in the config.
for (const auto& tr : tablet.replicas) {
if (tr.ts_uuid == move_info.ts_uuid_to) {
is_target_replica_present = true;
break;
}
}
if (move_info.ts_uuid_from == ri.ts_uuid && is_target_replica_present) {
// It seems both the source and the destination replicas of the
// scheduled replica movement operation are still in the config.
// That's a sign that the move operation hasn't yet completed.
// As explained above, let's interpret the move as successfully
// completed, so the source replica should not be counted in.
do_count_replica = false;
}
}
if (do_count_replica) {
it->second++;
}
auto table_ins = table_replicas_info.emplace(
tablet.table_id, TableReplicasAtServer());
TableReplicasAtServer& replicas_at_server = table_ins.first->second;
auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
if (do_count_replica) {
replicas_ins.first->second++;
}
}
}
// Check for the consistency of information derived from the ksck report.
for (const auto& elem : tserver_replicas_count) {
const auto& ts_uuid = elem.first;
int32_t count_by_table_info = 0;
for (auto& e : table_replicas_info) {
count_by_table_info += e.second[ts_uuid];
}
if (elem.second != count_by_table_info) {
return Status::Corruption("inconsistent cluster state returned by ksck");
}
}
// Populate ClusterBalanceInfo::servers_by_total_replica_count
auto& servers_by_count = balance_info.servers_by_total_replica_count;
for (const auto& elem : tserver_replicas_count) {
servers_by_count.emplace(elem.second, elem.first);
}
// Populate ClusterBalanceInfo::table_info_by_skew
auto& table_info_by_skew = balance_info.table_info_by_skew;
for (const auto& elem : table_replicas_info) {
const auto& table_id = elem.first;
int32_t max_count = numeric_limits<int32_t>::min();
int32_t min_count = numeric_limits<int32_t>::max();
TableBalanceInfo tbi;
tbi.table_id = table_id;
for (const auto& e : elem.second) {
const auto& ts_uuid = e.first;
const auto replica_count = e.second;
tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
max_count = std::max(replica_count, max_count);
min_count = std::min(replica_count, min_count);
}
table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
}
*cbi = std::move(balance_info);
return Status::OK();
}