Status AutoRebalancerTask::BuildClusterRawInfo()

in src/kudu/master/auto_rebalancer.cc [469:647]


Status AutoRebalancerTask::BuildClusterRawInfo(
    const optional<string>& location,
    ClusterRawInfo* raw_info) const {

  vector<ServerHealthSummary> tserver_summaries;
  unordered_set<string> tserver_uuids;
  vector<TableSummary> table_summaries;
  vector<TabletSummary> tablet_summaries;

  // Avoid making any moves if not all tservers are up, to prevent the possibility
  // of moving tablets, then having to move them again later, when a tserver that
  // was not available before, is available for tablet placement again.
  TSDescriptorVector descriptors;
  ts_manager_->GetDescriptorsAvailableForPlacement(&descriptors);
  if (descriptors.size() != ts_manager_->GetLiveCount()) {
    return Status::IllegalState(Substitute("not all tservers available for tablet placement"));
  }
  tserver_uuids.reserve(descriptors.size());
  tserver_summaries.reserve(descriptors.size());

  // All the tservers are healthy and available for placement.
  // For rebalancing, only need to fill the uuid and location fields.
  for (const auto& ts : descriptors) {
    ServerHealthSummary summary;
    summary.uuid = ts->permanent_uuid();
    if (ts->location()) {
      summary.ts_location = *(ts->location());
    }
    summary.health = ServerHealth::HEALTHY;
    tserver_uuids.insert(summary.uuid);
    tserver_summaries.emplace_back(std::move(summary));
  }

  vector<scoped_refptr<TableInfo>> table_infos;

  {
    CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
    RETURN_NOT_OK(leader_lock.first_failed_status());
    catalog_manager_->GetAllTables(&table_infos);
  }

  table_summaries.reserve(table_infos.size());

  for (const auto& table : table_infos) {
    TableMetadataLock table_l(table.get(), LockMode::READ);

    const SysTablesEntryPB& table_data = table->metadata().state().pb;
    if (table_data.state() == SysTablesEntryPB::REMOVED) {
      // Don't worry about rebalancing replicas that belong to deleted tables.
      continue;
    }
    TableSummary table_summary;
    table_summary.id = table->id();
    table_summary.name = table_data.name();
    table_summary.replication_factor = table_data.num_replicas();

    vector<scoped_refptr<TabletInfo>> tablet_infos;
    table->GetAllTablets(&tablet_infos);
    tablet_summaries.reserve(tablet_summaries.size() + tablet_infos.size());

    for (const auto& tablet : tablet_infos) {
      TabletMetadataLock tablet_l(tablet.get(), LockMode::READ);

      TabletSummary tablet_summary;
      tablet_summary.id = tablet->id();
      tablet_summary.table_id = table_summary.id;
      tablet_summary.table_name = table_summary.name;

      // Retrieve all replicas of the tablet.
      vector<ReplicaSummary> replicas;
      TabletLocationsPB locs_pb;
      CatalogManager::TSInfosDict ts_infos_dict;
      // GetTabletLocations() will fail if the catalog manager is not the leader.
      {
        CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager_);
        RETURN_NOT_OK(leaderlock.first_failed_status());
        // This will only return tablet replicas in the RUNNING state, and filter
        // to only retrieve voter replicas.
        RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
            tablet_summary.id,
            ReplicaTypeFilter::VOTER_REPLICA,
            /*use_external_addr=*/false,
            &locs_pb,
            &ts_infos_dict,
            nullopt));
      }

      // Consensus state information is the same for all replicas of this tablet.
      const ConsensusStatePB& cstatepb = tablet_l.data().pb.consensus_state();
      vector<string> voters;
      vector<string> non_voters;
      for (const auto& peer : cstatepb.committed_config().peers()) {
        if (peer.member_type() == RaftPeerPB::VOTER) {
          voters.emplace_back(peer.permanent_uuid());
        } else if (peer.member_type() == RaftPeerPB::NON_VOTER) {
          non_voters.emplace_back(peer.permanent_uuid());
        }
      }

      int leaders_count = 0;

      // Build a summary for each replica of the tablet.
      // Make sure that the tserver the tablet is on is registered with the master
      // and is available for replica placement.
      // If not, return an error.
      for (const auto& r : locs_pb.interned_replicas()) {
        int index = r.ts_info_idx();
        const TSInfoPB& ts_info = *(ts_infos_dict.ts_info_pbs()[index]);
        ReplicaSummary rep;
        rep.ts_uuid = ts_info.permanent_uuid();
        if (!ContainsKey(tserver_uuids, rep.ts_uuid)) {
          return Status::NotFound(Substitute("tserver $0 not available for placement",
                                             rep.ts_uuid));
        }
        const auto& addr = ts_info.rpc_addresses(0);
        rep.ts_address = Substitute("$0:$1", addr.host(), addr.port());
        rep.is_leader = r.role() == RaftPeerPB::LEADER;
        if (rep.is_leader) {
          leaders_count++;
        }
        rep.is_voter = true;
        rep.ts_healthy = true;
        replicas.emplace_back(std::move(rep));
      }
      tablet_summary.replicas = std::move(replicas);

      // Determine if tablet is healthy enough for rebalancing.
      if (voters.size() < table_summary.replication_factor) {
        tablet_summary.result = HealthCheckResult::UNDER_REPLICATED;
      } else if (leaders_count != 1) {
        tablet_summary.result = HealthCheckResult::UNAVAILABLE;
      } else {
        tablet_summary.result = HealthCheckResult::HEALTHY;
      }
      tablet_summaries.emplace_back(std::move(tablet_summary));
    }
    table_summaries.emplace_back(std::move(table_summary));
  }

  if (!location) {
    // Information on the whole cluster.
    raw_info->tserver_summaries = std::move(tserver_summaries);
    raw_info->tablet_summaries = std::move(tablet_summaries);
    raw_info->table_summaries = std::move(table_summaries);
    return Status::OK();
  }

  // Information on the specified location only: filter out non-relevant info.
  const auto& location_str = *location;
  unordered_set<string> ts_ids_at_location;
  for (const auto& summary : tserver_summaries) {
    if (summary.ts_location == location_str) {
      raw_info->tserver_summaries.emplace_back(summary);
      InsertOrDie(&ts_ids_at_location, summary.uuid);
    }
  }
  unordered_set<string> table_ids_at_location;
  for (const auto& summary : tablet_summaries) {
    const auto& replicas = summary.replicas;
    vector<ReplicaSummary> replicas_at_location;
    replicas_at_location.reserve(replicas.size());
    for (const auto& replica : replicas) {
      if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
        replicas_at_location.emplace_back(replica);
      }
    }
    if (!replicas_at_location.empty()) {
      table_ids_at_location.insert(summary.table_id);
      raw_info->tablet_summaries.emplace_back(summary);
      raw_info->tablet_summaries.back().replicas = std::move(replicas_at_location);
    }
  }
  for (const auto& summary : table_summaries) {
    if (ContainsKey(table_ids_at_location, summary.id)) {
      raw_info->table_summaries.emplace_back(summary);
    }
  }
  return Status::OK();
}