in src/kudu/consensus/consensus_queue.cc [1690:1916]
bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
const ConsensusResponsePB& response) {
DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
<< response.InitializationErrorString() << ". Response: " << SecureShortDebugString(response);
#ifdef FB_DO_NOT_REMOVE
CHECK(!response.has_error());
#endif
bool send_more_immediately = false;
boost::optional<int64_t> updated_commit_index;
Mode mode_copy;
{
std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
// TODO(mpercy): Handle response from proxy on behalf of another peer.
// For now, we'll try to ignore proxying here, but we may need to
// eventually handle that here for better health status and error logging.
TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding "
"peer response. Response: " << SecureShortDebugString(response);
return send_more_immediately;
}
// Sanity checks.
// Some of these can be eventually removed, but they are handy for now.
DCHECK(response.status().IsInitialized())
<< "Error: Uninitialized: " << response.InitializationErrorString()
<< ". Response: "<< SecureShortDebugString(response);
// TODO(mpercy): Include uuid in error messages as well.
DCHECK(response.has_responder_uuid() && !response.responder_uuid().empty())
<< "Got response from peer with empty UUID";
#ifdef FB_DO_NOT_REMOVE
DCHECK(!response.has_error()); // Application-level errors should be handled elsewhere.
#endif
DCHECK(response.has_status()); // Responses should always have a status.
// The status must always have a last received op id and a last committed index.
const ConsensusStatusPB& status = response.status();
DCHECK(status.has_last_received());
DCHECK(status.has_last_received_current_leader());
DCHECK(status.has_last_committed_idx());
// Take a snapshot of the previously-recorded peer state.
const TrackedPeer prev_peer_state = *peer;
// Update the peer's last exchange status based on the response.
// In this case, if there is a log matching property (LMP) mismatch, we
// want to immediately send another request as we attempt to sync the log
// offset between the local leader and the remote peer.
UpdateExchangeStatus(peer, prev_peer_state, response, &send_more_immediately);
// If the reported last-received op for the replica is in our local log,
// then resume sending entries from that point onward. Otherwise, resume
// after the last op they received from us. If we've never successfully
// sent them anything, start after the last-committed op in their log, which
// is guaranteed by the Raft protocol to be a valid op.
bool peer_has_prefix_of_log = IsOpInLog(status.last_received());
if (peer_has_prefix_of_log) {
// If the latest thing in their log is in our log, we are in sync.
peer->last_received = status.last_received();
peer->next_index = peer->last_received.index() + 1;
// Check if the peer is a NON_VOTER candidate ready for promotion.
PromoteIfNeeded(peer, prev_peer_state, status);
TransferLeadershipIfNeeded(*peer, status);
} else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
// Their log may have diverged from ours, however we are in the process
// of replicating our ops to them, so continue doing so. Eventually, we
// will cause the divergent entry in their log to be overwritten.
peer->last_received = status.last_received_current_leader();
peer->next_index = peer->last_received.index() + 1;
} else {
// The peer is divergent and they have not (successfully) received
// anything from us yet. Start sending from their last committed index.
// This logic differs from the Raft spec slightly because instead of
// stepping back one-by-one from the end until we no longer have an LMP
// error, we jump back to the last committed op indicated by the peer with
// the hope that doing so will result in a faster catch-up process.
DCHECK_GE(peer->last_known_committed_index, 0);
peer->next_index = peer->last_known_committed_index + 1;
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Peer " << peer_uuid << " log is divergent from this leader: "
<< "its last log entry " << OpIdToString(status.last_received()) << " is not in "
<< "this leader's log and it has not received anything from this leader yet. "
<< "Falling back to committed index " << peer->last_known_committed_index;
}
if (peer->last_exchange_status != PeerStatus::OK) {
// In this case, 'send_more_immediately' has already been set by
// UpdateExchangeStatus() to true in the case of an LMP mismatch, false
// otherwise.
return send_more_immediately;
}
if (response.has_responder_term()) {
// The peer must have responded with a term that is greater than or equal to
// the last known term for that peer.
peer->CheckMonotonicTerms(response.responder_term());
// If the responder didn't send an error back that must mean that it has
// a term that is the same or lower than ours.
CHECK_LE(response.responder_term(), queue_state_.current_term);
}
if (PREDICT_FALSE(VLOG_IS_ON(2))) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Received Response from Peer (" << peer->ToString() << "). "
<< "Response: " << SecureShortDebugString(response);
}
mode_copy = queue_state_.mode;
// If we're the leader, we can compute the new watermarks based on the progress
// of our followers.
// NOTE: it's possible this node might have lost its leadership (and the notification
// is just pending behind the lock we're holding), but any future leader will observe
// the same watermarks and make the same advancement, so this is safe.
int64_t old_all_replicated_index = 0;
int64_t new_all_replicated_index = 0;
if (mode_copy == LEADER) {
// Advance the majority replicated index.
if (!FLAGS_enable_flexi_raft) {
AdvanceQueueWatermark("majority_replicated",
&queue_state_.majority_replicated_index,
/*replicated_before=*/ prev_peer_state.last_received,
/*replicated_after=*/ peer->last_received,
/*num_peers_required=*/ queue_state_.majority_size_,
VOTER_REPLICAS,
peer);
} else if (peer->last_received.index() > queue_state_.majority_replicated_index ||
peer->last_exchange_status != PeerStatus::OK) {
// This method is expensive. The 'watermark' can change only if this
// peer's last received index is higer than the current
// majority_replicated_index. We also call this method when the
// last_exhange_status of the peer indicates an error. This is because
// 'majority_replicated_index' can go down. It sould be safe to
// completely skip calling this method when 'last_exhange_status' is an
// error, but we do not want to introduce a behavior change at this
// point. Check AdvanceQueueWatermark() for more comments
AdvanceMajorityReplicatedWatermarkFlexiRaft(
&queue_state_.majority_replicated_index,
/*replicated_before=*/ prev_peer_state.last_received,
/*replicated_after=*/ peer->last_received,
peer);
}
old_all_replicated_index = queue_state_.all_replicated_index;
// Advance the all replicated index.
AdvanceQueueWatermark("all_replicated",
&queue_state_.all_replicated_index,
/*replicated_before=*/ prev_peer_state.last_received,
/*replicated_after=*/ peer->last_received,
/*num_peers_required=*/ peers_map_.size(),
ALL_REPLICAS,
peer);
new_all_replicated_index = queue_state_.all_replicated_index;
// If the majority-replicated index is in our current term,
// and it is above our current committed index, then
// we can advance the committed index.
//
// It would seem that the "it is above our current committed index"
// check is redundant (and could be a CHECK), but in fact the
// majority-replicated index can currently go down, since we don't
// consider peers whose last contact was an error in the watermark
// calculation. See the TODO in AdvanceQueueWatermark() for more details.
int64_t commit_index_before = queue_state_.committed_index;
if (queue_state_.first_index_in_current_term != boost::none &&
queue_state_.majority_replicated_index >= queue_state_.first_index_in_current_term &&
queue_state_.majority_replicated_index > queue_state_.committed_index) {
queue_state_.committed_index = queue_state_.majority_replicated_index;
} else {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
<< "first index in current leader term: "
<< queue_state_.first_index_in_current_term << ". "
<< "current majority_replicated_index: "
<< queue_state_.majority_replicated_index << ", "
<< "current committed_index: "
<< queue_state_.committed_index;
}
// Once the commit index has been updated, go ahead and update the
// region_durable_index
AdvanceQueueRegionDurableIndex();
// Only notify observers if the commit index actually changed.
if (queue_state_.committed_index != commit_index_before) {
DCHECK_GT(queue_state_.committed_index, commit_index_before);
updated_commit_index = queue_state_.committed_index;
VLOG_WITH_PREFIX_UNLOCKED(2) << "Commit index advanced from "
<< commit_index_before << " to "
<< *updated_commit_index;
}
}
// If the peer's committed index is lower than our own, or if our log has
// the next request for the peer, set 'send_more_immediately' to true.
send_more_immediately = peer->last_known_committed_index < queue_state_.committed_index ||
log_cache_.HasOpBeenWritten(peer->next_index);
// Evict ops from log_cache only if:
// 1. This is not a leader node OR
// 2. 'all_replicated_index' has changed after processing this response
if (mode_copy != LEADER ||
(old_all_replicated_index != new_all_replicated_index)) {
log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
}
UpdateMetricsUnlocked();
}
if (mode_copy == LEADER && updated_commit_index != boost::none) {
NotifyObserversOfCommitIndexChange(*updated_commit_index);
}
return send_more_immediately;
}