in src/kudu/rebalance/rebalancer.cc [286:485]
Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const {
DCHECK(info);
// tserver UUID --> total replica count of all table's tablets at the server
// (tagged context applies here)
typedef unordered_map<string, int32_t> TableReplicasAtServer;
// The result information to build.
ClusterInfo result_info;
// tserver UUID --> total count of replicas at the server
unordered_map<string, int32_t> tserver_replicas_count;
// table_id.range_key --> count of tablet replicas of the table at tservers
unordered_map<TableIdAndTag, TableReplicasAtServer,
TableIdAndTagHash, TableIdAndTagEqual> table_replicas_info;
// UUIDs of unhealthy tablet servers.
unordered_set<string> unhealthy_tablet_servers;
// Build a set of tables with RF=1 (single replica tables).
unordered_set<string> rf1_tables;
if (!config_.move_rf1_replicas) {
for (const auto& s : raw_info.table_summaries) {
if (s.replication_factor == 1) {
rf1_tables.emplace(s.id);
}
}
}
auto& ts_uuids_by_location = result_info.locality.servers_by_location;
auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
for (const auto& summary : raw_info.tserver_summaries) {
const auto& ts_id = summary.uuid;
const auto& ts_location = summary.ts_location;
if (ContainsKey(config_.ignored_tservers, ts_id)) {
VLOG(1) << Substitute("ignoring tserver $0", ts_id);
continue;
}
VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
ts_location, set<string>());
InsertOrDie(&ts_ids, ts_id);
}
vector<string> skipped_tserver_msgs;
for (const auto& s : raw_info.tserver_summaries) {
if (s.health != ServerHealth::HEALTHY) {
skipped_tserver_msgs.emplace_back(
Substitute("skipping tablet server $0 ($1) because of its "
"non-HEALTHY status ($2)",
s.uuid,
s.address,
ServerHealthToString(s.health)));
unhealthy_tablet_servers.emplace(s.uuid);
continue;
}
tserver_replicas_count.emplace(s.uuid, 0);
}
if (!skipped_tserver_msgs.empty()) {
KLOG_EVERY_N_SECS(INFO, 10) << JoinStrings(skipped_tserver_msgs, "\n") << THROTTLE_MSG;
}
for (const auto& tablet : raw_info.tablet_summaries) {
if (!config_.move_rf1_replicas) {
if (ContainsKey(rf1_tables, tablet.table_id)) {
VLOG(1) << Substitute("tablet $0 of table '$1' ($2) has single replica, skipping",
tablet.id, tablet.table_name, tablet.table_id);
continue;
}
}
// Check if it's one of the tablets which are currently being rebalanced.
// If so, interpret the move as successfully completed, updating the
// replica counts correspondingly.
const auto it_pending_moves = moves_in_progress.find(tablet.id);
if (it_pending_moves != moves_in_progress.end()) {
const auto& move_info = it_pending_moves->second;
bool is_target_replica_present = false;
// Verify that the target replica is present in the config.
for (const auto& tr : tablet.replicas) {
if (tr.ts_uuid == move_info.ts_uuid_to) {
is_target_replica_present = true;
break;
}
}
// If the target replica is present, it will be processed in the code
// below. Otherwise, it's necessary to pretend as if the target replica
// is in the config already: the idea is to count in the absent target
// replica as if the movement has successfully completed already.
auto it = tserver_replicas_count.find(move_info.ts_uuid_to);
if (!is_target_replica_present && !move_info.ts_uuid_to.empty() &&
it != tserver_replicas_count.end()) {
it->second++;
auto table_ins = table_replicas_info.emplace(
TableIdAndTag{ tablet.table_id,
config_.enable_range_rebalancing
? tablet.range_key_begin : "" },
TableReplicasAtServer());
TableReplicasAtServer& replicas_at_server = table_ins.first->second;
auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
replicas_ins.first->second++;
}
}
for (const auto& ri : tablet.replicas) {
// Increment total count of replicas at the tablet server.
auto it = tserver_replicas_count.find(ri.ts_uuid);
if (it == tserver_replicas_count.end()) {
string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
if (ri.ts_address) {
msg += " (" + *ri.ts_address + ")";
}
msg += " since it's not reported among known tservers";
KLOG_EVERY_N_SECS(INFO, 10) << msg << THROTTLE_MSG;
continue;
}
bool do_count_replica = true;
if (it_pending_moves != moves_in_progress.end()) {
const auto& move_info = it_pending_moves->second;
if (move_info.ts_uuid_from == ri.ts_uuid) {
DCHECK(!ri.ts_uuid.empty());
// The source replica of the scheduled replica movement operation
// are still in the config. Interpreting the move as successfully
// completed, so the source replica should not be counted in.
do_count_replica = false;
}
}
if (do_count_replica) {
it->second++;
}
auto table_ins = table_replicas_info.emplace(
TableIdAndTag{tablet.table_id,
config_.enable_range_rebalancing ? tablet.range_key_begin : ""},
TableReplicasAtServer());
TableReplicasAtServer& replicas_at_server = table_ins.first->second;
auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
if (do_count_replica) {
replicas_ins.first->second++;
}
}
}
// Check for the consistency of information derived from the health report.
for (const auto& elem : tserver_replicas_count) {
const auto& ts_uuid = elem.first;
int32_t count_by_table_info = 0;
for (auto& e : table_replicas_info) {
count_by_table_info += e.second[ts_uuid];
}
if (elem.second != count_by_table_info) {
return Status::Corruption("inconsistent cluster state returned by check");
}
}
// Populate ClusterBalanceInfo::servers_by_total_replica_count
auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
for (const auto& elem : tserver_replicas_count) {
if (ContainsKey(config_.ignored_tservers, elem.first)) {
VLOG(1) << Substitute("ignoring tserver $0", elem.first);
continue;
}
servers_by_count.emplace(elem.second, elem.first);
}
// Populate ClusterBalanceInfo::table_info_by_skew
auto& table_info_by_skew = result_info.balance.table_info_by_skew;
for (const auto& elem : table_replicas_info) {
const auto& table_id = elem.first.table_id;
const auto& tag = elem.first.tag;
int32_t max_count = numeric_limits<int32_t>::min();
int32_t min_count = numeric_limits<int32_t>::max();
TableBalanceInfo tbi;
tbi.table_id = table_id;
tbi.tag = tag;
for (const auto& e : elem.second) {
const auto& ts_uuid = e.first;
const auto replica_count = e.second;
if (ContainsKey(config_.ignored_tservers, ts_uuid)) {
VLOG(1) << Substitute("ignoring replicas of table $0 on tserver $1", table_id, ts_uuid);
continue;
}
tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
max_count = std::max(replica_count, max_count);
min_count = std::min(replica_count, min_count);
}
table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
}
// TODO(aserbin): add sanity checks on the result.
*info = std::move(result_info);
return Status::OK();
}