in src/kudu/master/auto_rebalancer.cc [469:647]
Status AutoRebalancerTask::BuildClusterRawInfo(
const optional<string>& location,
ClusterRawInfo* raw_info) const {
vector<ServerHealthSummary> tserver_summaries;
unordered_set<string> tserver_uuids;
vector<TableSummary> table_summaries;
vector<TabletSummary> tablet_summaries;
// Avoid making any moves if not all tservers are up, to prevent the possibility
// of moving tablets, then having to move them again later, when a tserver that
// was not available before, is available for tablet placement again.
TSDescriptorVector descriptors;
ts_manager_->GetDescriptorsAvailableForPlacement(&descriptors);
if (descriptors.size() != ts_manager_->GetLiveCount()) {
return Status::IllegalState(Substitute("not all tservers available for tablet placement"));
}
tserver_uuids.reserve(descriptors.size());
tserver_summaries.reserve(descriptors.size());
// All the tservers are healthy and available for placement.
// For rebalancing, only need to fill the uuid and location fields.
for (const auto& ts : descriptors) {
ServerHealthSummary summary;
summary.uuid = ts->permanent_uuid();
if (ts->location()) {
summary.ts_location = *(ts->location());
}
summary.health = ServerHealth::HEALTHY;
tserver_uuids.insert(summary.uuid);
tserver_summaries.emplace_back(std::move(summary));
}
vector<scoped_refptr<TableInfo>> table_infos;
{
CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
RETURN_NOT_OK(leader_lock.first_failed_status());
catalog_manager_->GetAllTables(&table_infos);
}
table_summaries.reserve(table_infos.size());
for (const auto& table : table_infos) {
TableMetadataLock table_l(table.get(), LockMode::READ);
const SysTablesEntryPB& table_data = table->metadata().state().pb;
if (table_data.state() == SysTablesEntryPB::REMOVED) {
// Don't worry about rebalancing replicas that belong to deleted tables.
continue;
}
TableSummary table_summary;
table_summary.id = table->id();
table_summary.name = table_data.name();
table_summary.replication_factor = table_data.num_replicas();
vector<scoped_refptr<TabletInfo>> tablet_infos;
table->GetAllTablets(&tablet_infos);
tablet_summaries.reserve(tablet_summaries.size() + tablet_infos.size());
for (const auto& tablet : tablet_infos) {
TabletMetadataLock tablet_l(tablet.get(), LockMode::READ);
TabletSummary tablet_summary;
tablet_summary.id = tablet->id();
tablet_summary.table_id = table_summary.id;
tablet_summary.table_name = table_summary.name;
// Retrieve all replicas of the tablet.
vector<ReplicaSummary> replicas;
TabletLocationsPB locs_pb;
CatalogManager::TSInfosDict ts_infos_dict;
// GetTabletLocations() will fail if the catalog manager is not the leader.
{
CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager_);
RETURN_NOT_OK(leaderlock.first_failed_status());
// This will only return tablet replicas in the RUNNING state, and filter
// to only retrieve voter replicas.
RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
tablet_summary.id,
ReplicaTypeFilter::VOTER_REPLICA,
/*use_external_addr=*/false,
&locs_pb,
&ts_infos_dict,
nullopt));
}
// Consensus state information is the same for all replicas of this tablet.
const ConsensusStatePB& cstatepb = tablet_l.data().pb.consensus_state();
vector<string> voters;
vector<string> non_voters;
for (const auto& peer : cstatepb.committed_config().peers()) {
if (peer.member_type() == RaftPeerPB::VOTER) {
voters.emplace_back(peer.permanent_uuid());
} else if (peer.member_type() == RaftPeerPB::NON_VOTER) {
non_voters.emplace_back(peer.permanent_uuid());
}
}
int leaders_count = 0;
// Build a summary for each replica of the tablet.
// Make sure that the tserver the tablet is on is registered with the master
// and is available for replica placement.
// If not, return an error.
for (const auto& r : locs_pb.interned_replicas()) {
int index = r.ts_info_idx();
const TSInfoPB& ts_info = *(ts_infos_dict.ts_info_pbs()[index]);
ReplicaSummary rep;
rep.ts_uuid = ts_info.permanent_uuid();
if (!ContainsKey(tserver_uuids, rep.ts_uuid)) {
return Status::NotFound(Substitute("tserver $0 not available for placement",
rep.ts_uuid));
}
const auto& addr = ts_info.rpc_addresses(0);
rep.ts_address = Substitute("$0:$1", addr.host(), addr.port());
rep.is_leader = r.role() == RaftPeerPB::LEADER;
if (rep.is_leader) {
leaders_count++;
}
rep.is_voter = true;
rep.ts_healthy = true;
replicas.emplace_back(std::move(rep));
}
tablet_summary.replicas = std::move(replicas);
// Determine if tablet is healthy enough for rebalancing.
if (voters.size() < table_summary.replication_factor) {
tablet_summary.result = HealthCheckResult::UNDER_REPLICATED;
} else if (leaders_count != 1) {
tablet_summary.result = HealthCheckResult::UNAVAILABLE;
} else {
tablet_summary.result = HealthCheckResult::HEALTHY;
}
tablet_summaries.emplace_back(std::move(tablet_summary));
}
table_summaries.emplace_back(std::move(table_summary));
}
if (!location) {
// Information on the whole cluster.
raw_info->tserver_summaries = std::move(tserver_summaries);
raw_info->tablet_summaries = std::move(tablet_summaries);
raw_info->table_summaries = std::move(table_summaries);
return Status::OK();
}
// Information on the specified location only: filter out non-relevant info.
const auto& location_str = *location;
unordered_set<string> ts_ids_at_location;
for (const auto& summary : tserver_summaries) {
if (summary.ts_location == location_str) {
raw_info->tserver_summaries.emplace_back(summary);
InsertOrDie(&ts_ids_at_location, summary.uuid);
}
}
unordered_set<string> table_ids_at_location;
for (const auto& summary : tablet_summaries) {
const auto& replicas = summary.replicas;
vector<ReplicaSummary> replicas_at_location;
replicas_at_location.reserve(replicas.size());
for (const auto& replica : replicas) {
if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
replicas_at_location.emplace_back(replica);
}
}
if (!replicas_at_location.empty()) {
table_ids_at_location.insert(summary.table_id);
raw_info->tablet_summaries.emplace_back(summary);
raw_info->tablet_summaries.back().replicas = std::move(replicas_at_location);
}
}
for (const auto& summary : table_summaries) {
if (ContainsKey(table_ids_at_location, summary.id)) {
raw_info->table_summaries.emplace_back(summary);
}
}
return Status::OK();
}