Status VerifyMastersGetHostPorts()

in src/kudu/master/master_runner.cc [158:301]


Status VerifyMastersGetHostPorts(const vector<HostPort>& master_addrs,
                                 const string& local_uuid,
                                 const std::shared_ptr<rpc::Messenger>& messenger,
                                 HostPort* leader_hp,
                                 HostPort* local_hp,
                                 bool* needs_retry,
                                 bool* needs_add) {
  vector<set<string>> each_remote_masters_master_uuids;
  set<string> fetched_uuids;
  int64_t current_term = -1;
  int64_t committed_config_index = -1;
  for (const auto& hp : master_addrs) {
    Sockaddr master_addr;
    Status s = SockaddrFromHostPort(hp, &master_addr);
    if (!s.ok()) {
      LOG(INFO) << Substitute("Error resolving master address for $0: $1",
                              hp.ToString(), s.ToString());
      *needs_retry = true;
      return Status::OK();
    }

    // First, get the UUID of the remote master.
    GetMasterRegistrationRequestPB reg_req;
    GetMasterRegistrationResponsePB reg_resp;
    RpcController reg_rpc;
    MasterServiceProxy proxy(messenger, master_addr, master_addr.host());
    s = proxy.GetMasterRegistration(reg_req, &reg_resp, &reg_rpc);
    if (!s.ok() || reg_resp.has_error()) {
      LOG(INFO) << Substitute("Error getting master registration for $0: $1, $2",
                              master_addr.ToString(), s.ToString(),
                              SecureShortDebugString(reg_resp));
      *needs_retry = true;
      return Status::OK();
    }
    const bool is_leader = reg_resp.role() == consensus::RaftPeerPB::LEADER;
    if (is_leader) {
      *leader_hp = hp;
    }
    // Skip the local master -- we only care about what the other masters
    // think, in case we should be trying to join their quorum.
    const auto& uuid = reg_resp.instance_id().permanent_uuid();
    EmplaceIfNotPresent(&fetched_uuids, uuid);
    if (local_uuid == uuid) {
      *local_hp = hp;
      continue;
    }

    // Get the Raft config from the remote master to get their quorum's
    // UUIDs.
    RpcController rpc;
    GetConsensusStateRequestPB req;
    req.add_tablet_ids(SysCatalogTable::kSysCatalogTabletId);
    req.set_dest_uuid(uuid);
    req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
    GetConsensusStateResponsePB resp;
    ConsensusServiceProxy consensus_proxy(messenger, master_addr, master_addr.host());
    s = consensus_proxy.GetConsensusState(req, &resp, &rpc);
    if (!s.ok() || resp.has_error()) {
      LOG(INFO) << Substitute("Error getting master consensus for $0: $1",
                              master_addr.ToString(), s.ToString());
      *needs_retry = true;
      return Status::OK();
    }
    if (resp.tablets_size() != 1) {
      return Status::Corruption(
          Substitute("Error getting master consensus, expected one tablet but got $0: $1",
                     resp.tablets_size(), SecureShortDebugString(resp)));
    }
    // Retry if the the masters don't agree on the current term.
    const auto& cstate = resp.tablets(0).cstate();
    if (current_term == -1) {
      current_term = cstate.current_term();
    }
    if (cstate.current_term() != current_term) {
      LOG(INFO) << Substitute("Existing masters have differing terms: $0 vs $1",
                              current_term, cstate.current_term());
      *needs_retry = true;
      return Status::OK();
    }
    // Retry if there's a pending config -- presumably pending means it's
    // transient.
    if (cstate.has_pending_config()) {
      LOG(INFO) << Substitute("Existing masters have pending config: $0",
                              SecureShortDebugString(cstate.pending_config()));
      *needs_retry = true;
      return Status::OK();
    }
    // Retry if the masters don't agree on the current Raft config's index.
    if (committed_config_index == -1) {
      committed_config_index = cstate.committed_config().opid_index();
    }
    if (cstate.committed_config().opid_index() != committed_config_index) {
      LOG(INFO) << Substitute("Existing masters have differing Raft config indexes: $0 vs $1",
                              committed_config_index, cstate.committed_config().opid_index());
      *needs_retry = true;
      return Status::OK();
    }
    const auto& config = cstate.committed_config();
    set<string> uuids;
    for (const auto& p : config.peers()) {
      EmplaceIfNotPresent(&uuids, p.permanent_uuid());
    }
    each_remote_masters_master_uuids.emplace_back(std::move(uuids));
  }
  if (!leader_hp->Initialized()) {
    LOG(INFO) << Substitute("No leader master found from master $0", local_uuid);
    *needs_retry = true;
    return Status::OK();
  }
  // Ensure the Raft configs from each master match. If not, presumably it's
  // transient and should be retried.
  auto& raft_config_uuids = each_remote_masters_master_uuids[0];
  for (int i = 1; i < each_remote_masters_master_uuids.size(); i++) {
    const auto& cur_uuids = each_remote_masters_master_uuids[i];
    if (cur_uuids != raft_config_uuids) {
      set<string> set_diff;
      STLSetDifference(cur_uuids, raft_config_uuids, &set_diff);
      LOG(INFO) << Substitute("Remote masters have differing Raft configurations:"
                              "[$0] vs [$1] (diff: [$2])", JoinStrings(cur_uuids, ","),
                              JoinStrings(raft_config_uuids, ","), JoinStrings(set_diff, ","));
      *needs_retry = true;
      return Status::OK();
    }
  }
  // Ensure that if we need to add this master to the Raft config, it's the
  // only one we need to add.
  if (!ContainsKey(raft_config_uuids, local_uuid)) {
    EmplaceIfNotPresent(&raft_config_uuids, local_uuid);
    if (raft_config_uuids != fetched_uuids) {
      set<string> set_diff;
      STLSetDifference(fetched_uuids, raft_config_uuids, &set_diff);
      return Status::NotSupported(Substitute("Kudu only supports adding one master at a time; "
          "tentative Raft config doesn't match the UUIDs fetched from --master_addresses. "
          "Raft config + local UUID: [$0] vs fetched UUIDs: [$1], diff: [$2]",
          JoinStrings(raft_config_uuids, ","), JoinStrings(fetched_uuids, ","),
          JoinStrings(set_diff, ",")));
    }
    *needs_add = true;
  } else {
    *needs_add = false;
  }
  *needs_retry = false;
  return Status::OK();
}