in src/kudu/consensus/consensus_peers.cc [209:351]
void Peer::SendNextRequest(bool even_if_queue_empty) {
std::unique_lock<simple_spinlock> l(peer_lock_);
if (PREDICT_FALSE(closed_)) {
return;
}
// Only allow one request at a time.
if (request_pending_) {
return;
}
// For the first request sent by the peer, we send it even if the queue is empty,
// which it will always appear to be for the first request, since this is the
// negotiation round.
if (!has_sent_first_request_) {
even_if_queue_empty = true;
has_sent_first_request_ = true;
}
// If our last request generated an error, and this is not a normal
// heartbeat request, then don't send the "per-op" request. Instead,
// we'll wait for the heartbeat.
//
// TODO(todd): we could consider looking at the number of consecutive failed
// attempts, and instead of ignoring the signal, ask the heartbeater
// to "expedite" the next heartbeat in order to achieve something like
// exponential backoff after an error. As it is implemented today, any
// transient error will result in a latency blip as long as the heartbeat
// period.
if (failed_attempts_ > 0 && !even_if_queue_empty) {
return;
}
// The peer has no pending request nor is sending: send the request.
bool needs_tablet_copy = false;
// If this peer is not healthy (as indicated by failed_attempts_), then
// degrade this peer to 'status-only' heartbeat request i.e donot read any ops
// from log/log-cache to build the entire batch of ops to be sent to this
// peer. This ensures that leader is not doing the expensive operation of
// reading from log-cache to build the message for a peer that is not
// reachable.
bool read_ops = (failed_attempts_ <= 0);
request_pending_ = true;
// The next hop to route to to ship messages to this peer. This could be
// different than the peer_uuid when proxy is enabled
string next_hop_uuid;
int64_t commit_index_before = request_.has_committed_index() ?
request_.committed_index() : kMinimumOpIdIndex;
Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), read_ops, &request_,
&replicate_msg_refs_, &needs_tablet_copy,
&next_hop_uuid);
int64_t commit_index_after = request_.has_committed_index() ?
request_.committed_index() : kMinimumOpIdIndex;
if (PREDICT_FALSE(!s.ok())) {
// Incrementing failed_attempts_ prevents a RequestForPeer error to continually
// trigger an error on every actual write. The next attempt to RequestForPeer
// will now be restricted to Heartbeats, but because this is hard failure
// it will keep failing but only fail less often.
// TODO -
// Make empty heartbeats go through after a failure to send actual messages,
// without changing the cursor at all on peer, but still maintaining authority on
// it. Otherwise node keeps asking for votes, destabilizing cluster.
failed_attempts_++;
VLOG_WITH_PREFIX_UNLOCKED(1) << s.ToString();
request_pending_ = false;
return;
}
#ifdef FB_DO_NOT_REMOVE
if (PREDICT_FALSE(needs_tablet_copy)) {
Status s = PrepareTabletCopyRequest();
if (s.ok()) {
controller_.Reset();
request_pending_ = true;
l.unlock();
// Capture a shared_ptr reference into the RPC callback so that we're guaranteed
// that this object outlives the RPC.
shared_ptr<Peer> s_this = shared_from_this();
proxy_->StartTabletCopyAsync(&tc_request_, &tc_response_, &controller_,
[s_this]() {
s_this->ProcessTabletCopyResponse();
});
} else {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request for peer: "
<< s.ToString();
request_pending_ = false;
}
return;
}
#endif
request_.set_tablet_id(tablet_id_);
request_.set_caller_uuid(leader_uuid_);
request_.set_dest_uuid(peer_pb_.permanent_uuid());
bool req_has_ops = request_.ops_size() > 0 || (commit_index_after > commit_index_before);
// If the queue is empty, check if we were told to send a status-only
// message, if not just return.
if (PREDICT_FALSE(!req_has_ops && !even_if_queue_empty)) {
request_pending_ = false;
return;
}
if (req_has_ops) {
// If we're actually sending ops there's no need to heartbeat for a while.
heartbeater_->Snooze();
}
MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending to peer " << peer_pb().permanent_uuid() << ": "
<< SecureShortDebugString(request_);
controller_.Reset();
l.unlock();
// Capture a shared_ptr reference into the RPC callback so that we're guaranteed
// that this object outlives the RPC.
shared_ptr<Peer> s_this = shared_from_this();
// TODO: Refactor this code. Ideally all fields in 'request_' related to
// proxying should be set inside PeerMessageQueue::RequestForPeer(). Move the
// setting of 'proxy_hops_remaining' to PeerMessageQueue::RequestForPeer()
if (next_hop_uuid != peer_pb().permanent_uuid()) {
// If this is a proxy request, set the hops remaining value.
request_.set_proxy_hops_remaining(FLAGS_raft_proxy_max_hops);
}
shared_ptr<PeerProxy> next_hop_proxy = peer_proxy_pool_->Get(next_hop_uuid);
if (!next_hop_proxy) {
LOG_WITH_PREFIX_UNLOCKED(FATAL) << "peer with uuid " << next_hop_uuid
<< " not found in peer proxy pool";
}
next_hop_proxy->UpdateAsync(&request_, &response_, &controller_,
[s_this]() {
s_this->ProcessResponse();
});
}