Status Rebalancer::BuildClusterInfo()

in src/kudu/rebalance/rebalancer.cc [286:485]


Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                                    const MovesInProgress& moves_in_progress,
                                    ClusterInfo* info) const {
  DCHECK(info);

  // tserver UUID --> total replica count of all table's tablets at the server
  // (tagged context applies here)
  typedef unordered_map<string, int32_t> TableReplicasAtServer;

  // The result information to build.
  ClusterInfo result_info;

  // tserver UUID --> total count of replicas at the server
  unordered_map<string, int32_t> tserver_replicas_count;

  // table_id.range_key --> count of tablet replicas of the table at tservers
  unordered_map<TableIdAndTag, TableReplicasAtServer,
      TableIdAndTagHash, TableIdAndTagEqual> table_replicas_info;

  // UUIDs of unhealthy tablet servers.
  unordered_set<string> unhealthy_tablet_servers;

  // Build a set of tables with RF=1 (single replica tables).
  unordered_set<string> rf1_tables;
  if (!config_.move_rf1_replicas) {
    for (const auto& s : raw_info.table_summaries) {
      if (s.replication_factor == 1) {
        rf1_tables.emplace(s.id);
      }
    }
  }

  auto& ts_uuids_by_location = result_info.locality.servers_by_location;
  auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
  for (const auto& summary : raw_info.tserver_summaries) {
    const auto& ts_id = summary.uuid;
    const auto& ts_location = summary.ts_location;
    if (ContainsKey(config_.ignored_tservers, ts_id)) {
      VLOG(1) << Substitute("ignoring tserver $0", ts_id);
      continue;
    }
    VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
    EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
    auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
                                   ts_location, set<string>());
    InsertOrDie(&ts_ids, ts_id);
  }

  vector<string> skipped_tserver_msgs;
  for (const auto& s : raw_info.tserver_summaries) {
    if (s.health != ServerHealth::HEALTHY) {
      skipped_tserver_msgs.emplace_back(
          Substitute("skipping tablet server $0 ($1) because of its "
                     "non-HEALTHY status ($2)",
                     s.uuid,
                     s.address,
                     ServerHealthToString(s.health)));
      unhealthy_tablet_servers.emplace(s.uuid);
      continue;
    }
    tserver_replicas_count.emplace(s.uuid, 0);
  }
  if (!skipped_tserver_msgs.empty()) {
    KLOG_EVERY_N_SECS(INFO, 10) << JoinStrings(skipped_tserver_msgs, "\n") << THROTTLE_MSG;
  }

  for (const auto& tablet : raw_info.tablet_summaries) {
    if (!config_.move_rf1_replicas) {
      if (ContainsKey(rf1_tables, tablet.table_id)) {
        VLOG(1) << Substitute("tablet $0 of table '$1' ($2) has single replica, skipping",
                              tablet.id, tablet.table_name, tablet.table_id);
        continue;
      }
    }

    // Check if it's one of the tablets which are currently being rebalanced.
    // If so, interpret the move as successfully completed, updating the
    // replica counts correspondingly.
    const auto it_pending_moves = moves_in_progress.find(tablet.id);
    if (it_pending_moves != moves_in_progress.end()) {
      const auto& move_info = it_pending_moves->second;
      bool is_target_replica_present = false;
      // Verify that the target replica is present in the config.
      for (const auto& tr : tablet.replicas) {
        if (tr.ts_uuid == move_info.ts_uuid_to) {
          is_target_replica_present = true;
          break;
        }
      }
      // If the target replica is present, it will be processed in the code
      // below. Otherwise, it's necessary to pretend as if the target replica
      // is in the config already: the idea is to count in the absent target
      // replica as if the movement has successfully completed already.
      auto it = tserver_replicas_count.find(move_info.ts_uuid_to);
      if (!is_target_replica_present && !move_info.ts_uuid_to.empty() &&
          it != tserver_replicas_count.end()) {
        it->second++;
        auto table_ins = table_replicas_info.emplace(
            TableIdAndTag{ tablet.table_id,
                           config_.enable_range_rebalancing
                               ? tablet.range_key_begin : "" },
            TableReplicasAtServer());
        TableReplicasAtServer& replicas_at_server = table_ins.first->second;

        auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
        replicas_ins.first->second++;
      }
    }

    for (const auto& ri : tablet.replicas) {
      // Increment total count of replicas at the tablet server.
      auto it = tserver_replicas_count.find(ri.ts_uuid);
      if (it == tserver_replicas_count.end()) {
        string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
        if (ri.ts_address) {
          msg += " (" + *ri.ts_address + ")";
        }
        msg += " since it's not reported among known tservers";
        KLOG_EVERY_N_SECS(INFO, 10) << msg << THROTTLE_MSG;
        continue;
      }
      bool do_count_replica = true;
      if (it_pending_moves != moves_in_progress.end()) {
        const auto& move_info = it_pending_moves->second;
        if (move_info.ts_uuid_from == ri.ts_uuid) {
          DCHECK(!ri.ts_uuid.empty());
          // The source replica of the scheduled replica movement operation
          // are still in the config. Interpreting the move as successfully
          // completed, so the source replica should not be counted in.
          do_count_replica = false;
        }
      }
      if (do_count_replica) {
        it->second++;
      }

      auto table_ins = table_replicas_info.emplace(
          TableIdAndTag{tablet.table_id,
                        config_.enable_range_rebalancing ? tablet.range_key_begin : ""},
          TableReplicasAtServer());
      TableReplicasAtServer& replicas_at_server = table_ins.first->second;

      auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
      if (do_count_replica) {
        replicas_ins.first->second++;
      }
    }
  }

  // Check for the consistency of information derived from the health report.
  for (const auto& elem : tserver_replicas_count) {
    const auto& ts_uuid = elem.first;
    int32_t count_by_table_info = 0;
    for (auto& e : table_replicas_info) {
      count_by_table_info += e.second[ts_uuid];
    }
    if (elem.second != count_by_table_info) {
      return Status::Corruption("inconsistent cluster state returned by check");
    }
  }

  // Populate ClusterBalanceInfo::servers_by_total_replica_count
  auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
  for (const auto& elem : tserver_replicas_count) {
    if (ContainsKey(config_.ignored_tservers, elem.first)) {
      VLOG(1) << Substitute("ignoring tserver $0", elem.first);
      continue;
    }
    servers_by_count.emplace(elem.second, elem.first);
  }

  // Populate ClusterBalanceInfo::table_info_by_skew
  auto& table_info_by_skew = result_info.balance.table_info_by_skew;
  for (const auto& elem : table_replicas_info) {
    const auto& table_id = elem.first.table_id;
    const auto& tag = elem.first.tag;
    int32_t max_count = numeric_limits<int32_t>::min();
    int32_t min_count = numeric_limits<int32_t>::max();
    TableBalanceInfo tbi;
    tbi.table_id = table_id;
    tbi.tag = tag;
    for (const auto& e : elem.second) {
      const auto& ts_uuid = e.first;
      const auto replica_count = e.second;
      if (ContainsKey(config_.ignored_tservers, ts_uuid)) {
        VLOG(1) << Substitute("ignoring replicas of table $0 on tserver $1", table_id, ts_uuid);
        continue;
      }
      tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
      max_count = std::max(replica_count, max_count);
      min_count = std::min(replica_count, min_count);
    }
    table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
  }

  // TODO(aserbin): add sanity checks on the result.
  *info = std::move(result_info);

  return Status::OK();
}