Status Rebalancer::KsckResultsToClusterBalanceInfo()

in src/kudu/tools/rebalancer.cc [315:451]


Status Rebalancer::KsckResultsToClusterBalanceInfo(
    const KsckResults& ksck_info,
    const MovesInProgress& pending_moves,
    ClusterBalanceInfo* cbi) const {
  DCHECK(cbi);

  // tserver UUID --> total replica count of all table's tablets at the server
  typedef unordered_map<string, int32_t> TableReplicasAtServer;

  // The result table balance information to build.
  ClusterBalanceInfo balance_info;

  unordered_map<string, int32_t> tserver_replicas_count;
  unordered_map<string, TableReplicasAtServer> table_replicas_info;

  // Build a set of tables with RF=1 (single replica tables).
  unordered_set<string> rf1_tables;
  if (!config_.move_rf1_replicas) {
    for (const auto& s : ksck_info.table_summaries) {
      if (s.replication_factor == 1) {
        rf1_tables.emplace(s.id);
      }
    }
  }

  for (const auto& s : ksck_info.tserver_summaries) {
    if (s.health != KsckServerHealth::HEALTHY) {
      LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
                              "non-HEALTHY status ($2)",
                              s.uuid, s.address,
                              ServerHealthToString(s.health));
      continue;
    }
    tserver_replicas_count.emplace(s.uuid, 0);
  }

  for (const auto& tablet : ksck_info.tablet_summaries) {
    if (!config_.move_rf1_replicas) {
      if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
        LOG(INFO) << Substitute("tablet $0 of table '$0' ($1) has single replica, skipping",
                                tablet.id, tablet.table_name, tablet.table_id);
        continue;
      }
    }

    // Check if it's one of the tablets which are currently being rebalanced.
    // If so, interpret the move as successfully completed, updating the
    // replica counts correspondingly.
    const auto it_pending_moves = pending_moves.find(tablet.id);

    for (const auto& ri : tablet.replicas) {
      // Increment total count of replicas at the tablet server.
      auto it = tserver_replicas_count.find(ri.ts_uuid);
      if (it == tserver_replicas_count.end()) {
        string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
        if (ri.ts_address) {
          msg += " (" + *ri.ts_address + ")";
        }
        msg += " since it's not reported among known tservers";
        LOG(INFO) << msg;
        continue;
      }
      bool do_count_replica = true;
      if (it_pending_moves != pending_moves.end() &&
          tablet.result == KsckCheckResult::RECOVERING) {
        const auto& move_info = it_pending_moves->second;
        bool is_target_replica_present = false;
        // Verify that the target replica is present in the config.
        for (const auto& tr : tablet.replicas) {
          if (tr.ts_uuid == move_info.ts_uuid_to) {
            is_target_replica_present = true;
            break;
          }
        }
        if (move_info.ts_uuid_from == ri.ts_uuid && is_target_replica_present) {
          // It seems both the source and the destination replicas of the
          // scheduled replica movement operation are still in the config.
          // That's a sign that the move operation hasn't yet completed.
          // As explained above, let's interpret the move as successfully
          // completed, so the source replica should not be counted in.
          do_count_replica = false;
        }
      }
      if (do_count_replica) {
        it->second++;
      }

      auto table_ins = table_replicas_info.emplace(
          tablet.table_id, TableReplicasAtServer());
      TableReplicasAtServer& replicas_at_server = table_ins.first->second;

      auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
      if (do_count_replica) {
        replicas_ins.first->second++;
      }
    }
  }

  // Check for the consistency of information derived from the ksck report.
  for (const auto& elem : tserver_replicas_count) {
    const auto& ts_uuid = elem.first;
    int32_t count_by_table_info = 0;
    for (auto& e : table_replicas_info) {
      count_by_table_info += e.second[ts_uuid];
    }
    if (elem.second != count_by_table_info) {
      return Status::Corruption("inconsistent cluster state returned by ksck");
    }
  }

  // Populate ClusterBalanceInfo::servers_by_total_replica_count
  auto& servers_by_count = balance_info.servers_by_total_replica_count;
  for (const auto& elem : tserver_replicas_count) {
    servers_by_count.emplace(elem.second, elem.first);
  }

  // Populate ClusterBalanceInfo::table_info_by_skew
  auto& table_info_by_skew = balance_info.table_info_by_skew;
  for (const auto& elem : table_replicas_info) {
    const auto& table_id = elem.first;
    int32_t max_count = numeric_limits<int32_t>::min();
    int32_t min_count = numeric_limits<int32_t>::max();
    TableBalanceInfo tbi;
    tbi.table_id = table_id;
    for (const auto& e : elem.second) {
      const auto& ts_uuid = e.first;
      const auto replica_count = e.second;
      tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
      max_count = std::max(replica_count, max_count);
      min_count = std::min(replica_count, min_count);
    }
    table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
  }
  *cbi = std::move(balance_info);

  return Status::OK();
}