in src/kudu/consensus/consensus_queue.cc [696:910]
Status PeerMessageQueue::RequestForPeer(const string& uuid,
bool read_ops,
ConsensusRequestPB* request,
vector<ReplicateRefPtr>* msg_refs,
bool* needs_tablet_copy,
std::string* next_hop_uuid) {
// Maintain a thread-safe copy of necessary members.
OpId preceding_id;
int64_t current_term;
TrackedPeer peer_copy;
MonoDelta unreachable_time;
{
std::lock_guard<simple_spinlock> lock(queue_lock_);
DCHECK_EQ(queue_state_.state, kQueueOpen);
DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());
TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
return Status::NotFound(Substitute("peer $0 is no longer tracked or "
"queue is not in leader mode", uuid));
}
peer_copy = *peer;
// Clear the requests without deleting the entries, as they may be in use by other peers.
request->mutable_ops()->ExtractSubrange(0, request->ops_size(), nullptr);
// This is initialized to the queue's last appended op but gets set to the id of the
// log entry preceding the first one in 'messages' if messages are found for the peer.
preceding_id = queue_state_.last_appended;
current_term = queue_state_.current_term;
request->set_committed_index(queue_state_.committed_index);
request->set_all_replicated_index(queue_state_.all_replicated_index);
request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
request->set_caller_term(current_term);
request->set_region_durable_index(queue_state_.region_durable_index);
unreachable_time = MonoTime::Now() - peer_copy.last_communication_time;
RETURN_NOT_OK(routing_table_container_->NextHop(
local_peer_pb_.permanent_uuid(), uuid, next_hop_uuid));
if (*next_hop_uuid != uuid) {
// If proxy_peer is not healthy, then route directly to the destination
// TODO: Multi hop proxy support needs better failure and health checks
// for proxy peer. The current method of detecting unhealthy proxy peer
// works only on the leader. One solution could be for the leader to
// periodically exchange the health report of all peers as part of
// UpdateReplica() call
TrackedPeer* proxy_peer = FindPtrOrNull(peers_map_, *next_hop_uuid);
if (proxy_peer == nullptr ||
HasProxyPeerFailedUnlocked(proxy_peer, peer)) {
*next_hop_uuid = uuid;
}
}
}
// Always trigger a health status update check at the end of this function.
bool wal_catchup_progress = false;
bool wal_catchup_failure = false;
// Preventing the overhead of this as we need to take consensus queue lock
// again
SCOPED_CLEANUP({
if (!FLAGS_update_peer_health_status) {
return;
}
std::lock_guard<simple_spinlock> lock(queue_lock_);
TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
VLOG(1) << LogPrefixUnlocked() << "peer " << uuid
<< " is no longer tracked or queue is not in leader mode";
return;
}
if (wal_catchup_progress) peer->wal_catchup_possible = true;
if (wal_catchup_failure) peer->wal_catchup_possible = false;
UpdatePeerHealthUnlocked(peer);
});
if (peer_copy.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
VLOG(3) << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
*needs_tablet_copy = true;
return Status::OK();
}
*needs_tablet_copy = false;
// If the next hop != the destination, we are sending these messages via a proxy.
bool route_via_proxy = *next_hop_uuid != uuid;
if (route_via_proxy) {
// Set proxy uuid
request->set_proxy_dest_uuid(*next_hop_uuid);
} else {
// Clear proxy uuid to ensure that this message is not rejected by the
// destination
request->clear_proxy_dest_uuid();
}
// If we've never communicated with the peer, we don't know what messages to
// send, so we'll send a status-only request. Otherwise, we grab requests
// from the log starting at the last_received point.
// If the caller has explicitly indicated to not read the ops (as indicated by
// 'read_ops'), then we skip reading ops from log-cache/log. The caller
// ususally does this when the leader detects that a peer is unhealthy and
// hence needs to be degraded to a 'status-only' request
if (peer_copy.last_exchange_status != PeerStatus::NEW && read_ops) {
// The batch of messages to send to the peer.
vector<ReplicateRefPtr> messages;
int max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSize();
ReadContext read_context;
read_context.for_peer_uuid = &uuid;
read_context.for_peer_host = &peer_copy.peer_pb.last_known_addr().host();
read_context.for_peer_port = peer_copy.peer_pb.last_known_addr().port();
read_context.route_via_proxy = route_via_proxy;
// We try to get the follower's next_index from our log.
Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
max_batch_size,
read_context,
&messages,
&preceding_id);
if (PREDICT_FALSE(!s.ok())) {
// It's normal to have a NotFound() here if a follower falls behind where
// the leader has GCed its logs. The follower replica will hang around
// for a while until it's evicted.
if (PREDICT_TRUE(s.IsNotFound())) {
KLOG_EVERY_N_SECS_THROTTLER(INFO, 60, *peer_copy.status_log_throttler, "logs_gced")
<< LogPrefixUnlocked()
<< Substitute("The logs necessary to catch up peer $0 have been "
"garbage collected. The follower will never be able "
"to catch up ($1)", uuid, s.ToString());
wal_catchup_failure = true;
return s;
}
if (s.IsIncomplete()) {
// IsIncomplete() means that we tried to read beyond the head of the log
// (in the future). See KUDU-1078.
LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
<< "while preparing peer request: "
<< s.ToString() << ". Destination peer: "
<< peer_copy.ToString();
return s;
}
LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
<< s.ToString() << ". Destination peer: "
<< peer_copy.ToString();
}
// Since we were able to read ops through the log cache, we know that
// catchup is possible.
wal_catchup_progress = true;
// We use AddAllocated rather than copy, because we pin the log cache at the
// "all replicated" point. At some point we may want to allow partially loading
// (and not pinning) earlier messages. At that point we'll need to do something
// smarter here, like copy or ref-count.
if (!route_via_proxy) {
for (const ReplicateRefPtr& msg : messages) {
request->mutable_ops()->AddAllocated(msg->get());
}
msg_refs->swap(messages);
} else {
vector<ReplicateRefPtr> proxy_ops;
for (const ReplicateRefPtr& msg : messages) {
ReplicateRefPtr proxy_op = make_scoped_refptr_replicate(new ReplicateMsg);
*proxy_op->get()->mutable_id() = msg->get()->id();
proxy_op->get()->set_timestamp(msg->get()->timestamp());
proxy_op->get()->set_op_type(PROXY_OP);
request->mutable_ops()->AddAllocated(proxy_op->get());
proxy_ops.emplace_back(std::move(proxy_op));
}
msg_refs->swap(proxy_ops);
}
}
DCHECK(preceding_id.IsInitialized());
request->mutable_preceding_id()->CopyFrom(preceding_id);
// If we are sending ops to the follower, but the batch doesn't reach the current
// committed index, we can consider the follower lagging, and it's worth
// logging this fact periodically.
if (request->ops_size() > 0) {
int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
if (last_op_sent < request->committed_index()) {
// Will use metrics to cover this and alarm on it, otherwise it can overwhelm
// logs
VLOG_WITH_PREFIX_UNLOCKED(2)
<< "Peer " << uuid << " is lagging by at least "
<< (request->committed_index() - last_op_sent)
<< " ops behind the committed index " << THROTTLE_MSG;
}
// If we're not sending ops to the follower, set the safe time on the request.
// TODO(dralves) When we have leader leases, send this all the time.
} else {
if (PREDICT_TRUE(FLAGS_safe_time_advancement_without_writes)) {
request->set_safe_timestamp(time_manager_->GetSafeTime().value());
} else {
KLOG_EVERY_N_SECS(WARNING, 300) << "Safe time advancement without writes is disabled. "
"Snapshot reads on non-leader replicas may stall if there are no writes in progress.";
}
}
if (PREDICT_FALSE(VLOG_IS_ON(2))) {
if (request->ops_size() > 0) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid
<< ". Size: " << request->ops_size()
<< ". From: " << SecureShortDebugString(request->ops(0).id()) << ". To: "
<< SecureShortDebugString(request->ops(request->ops_size() - 1).id());
} else {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending status only request to Peer: " << uuid
<< ": " << SecureDebugString(*request);
}
}
return Status::OK();
}