in src/kudu/consensus/raft_consensus.cc [4046:4316]
void RaftConsensus::HandleProxyRequest(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
rpc::RpcContext* context) {
MonoDelta wal_wait_timeout = MonoDelta::FromMilliseconds(FLAGS_raft_log_cache_proxy_wait_time_ms);
MonoTime wal_wait_deadline = MonoTime::Now() + wal_wait_timeout;
// TODO(mpercy): Remove this config lookup when refactoring DRT to return a
// RaftPeerPB, which will prevent a validation race.
RaftConfigPB active_config;
{
// Snapshot the active Raft config so we know how to route proxied messages.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RET_RESPOND_ERROR_NOT_OK(CheckRunningUnlocked());
active_config = cmeta_->ActiveConfig();
}
raft_proxy_num_requests_received_->Increment();
// Initial implementation:
//
// Synchronously:
// 1. Validate that the request is addressed to the local node via 'proxy_dest_uuid'.
// 2. Reconstitute each message from the local cache.
//
// Asynchronously:
// 4. Deliver the reconstituted request directly to the remote (async).
// 5. Proxy the response from the remote back to the caller.
// Validate the request.
if (request->proxy_dest_uuid() != peer_uuid()) {
Status s = Status::InvalidArgument(Substitute("Wrong proxy destination UUID requested. "
"Local UUID: $1. Requested UUID: $2",
peer_uuid(), request->proxy_dest_uuid()));
LOG_WITH_PREFIX(WARNING) << s.ToString() << ": from " << context->requestor_string()
<< ": " << SecureShortDebugString(*request);
SetupErrorAndRespond(s, ServerErrorPB::WRONG_SERVER_UUID, response, context);
return;
}
if (request->dest_uuid() == peer_uuid()) {
LOG_WITH_PREFIX(WARNING) << "dest_uuid and proxy_dest_uuid are the same: "
<< request->proxy_dest_uuid() << ": "
<< request->ShortDebugString();
context->RespondFailure(Status::InvalidArgument("proxy and desination must be different"));
return;
}
if (request->proxy_hops_remaining() < 1) {
LOG_WITH_PREFIX(WARNING) << "Proxy hops remaining exhausted (possible routing loop?) "
<< "in request to peer "
<< request->proxy_dest_uuid() << ": "
<< request->ShortDebugString();
raft_proxy_num_requests_hops_remaining_exhausted_->Increment();
context->RespondFailure(Status::Incomplete("proxy hops remaining exhausted",
"possible routing loop"));
return;
}
// Construct the downstream request; copy the relevant fields from the
// proxied request.
ConsensusRequestPB downstream_request;
auto prevent_ops_deletion = MakeScopedCleanup([&]() {
// Prevent double-deletion of these requests.
downstream_request.mutable_ops()->ExtractSubrange(
/*start=*/ 0, /*num=*/ downstream_request.ops_size(), /*elements=*/ nullptr);
});
downstream_request.set_dest_uuid(request->dest_uuid());
downstream_request.set_tablet_id(request->tablet_id());
downstream_request.set_caller_uuid(request->caller_uuid());
downstream_request.set_caller_term(request->caller_term());
// Decrement hops remaining.
downstream_request.set_proxy_hops_remaining(request->proxy_hops_remaining() - 1);
if (request->has_preceding_id()) {
*downstream_request.mutable_preceding_id() = request->preceding_id();
}
if (request->has_committed_index()) {
downstream_request.set_committed_index(request->committed_index());
}
if (request->has_all_replicated_index()) {
downstream_request.set_all_replicated_index(request->all_replicated_index());
}
if (request->has_safe_timestamp()) {
downstream_request.set_safe_timestamp(request->safe_timestamp());
}
if (request->has_last_idx_appended_to_leader()) {
downstream_request.set_last_idx_appended_to_leader(request->last_idx_appended_to_leader());
}
if(request->has_region_durable_index()) {
downstream_request.set_region_durable_index(request->region_durable_index());
}
downstream_request.set_proxy_caller_uuid(peer_uuid());
string next_uuid = request->dest_uuid();
if (FLAGS_raft_enable_multi_hop_proxy_routing) {
Status s = routing_table_container_->NextHop(
peer_uuid(), request->dest_uuid(), &next_uuid);
if (PREDICT_FALSE(!s.ok())) {
raft_proxy_num_requests_unknown_dest_->Increment();
}
RET_RESPOND_ERROR_NOT_OK(s);
}
bool degraded_to_heartbeat = false;
vector<ReplicateRefPtr> messages;
messages.clear();
if (request->dest_uuid() != next_uuid) {
// Multi-hop proxy request.
downstream_request.set_proxy_dest_uuid(next_uuid);
// Forward the existing PROXY_OP ops.
for (int i = 0; i < request->ops_size(); i++) {
*downstream_request.add_ops() = request->ops(i);
}
prevent_ops_deletion.cancel(); // The ops we copy here are not pre-allocated
} else {
// Reconstitute proxied events from the local cache.
// If the cache does not have all events, we retry up until the specified
// retry timeout.
// TODO(mpercy): Switch this from polling to event-triggered.
PeerMessageQueue::TrackedPeer peer_to_send;
Status s = queue_->FindPeer(request->dest_uuid(), &peer_to_send);
ReadContext read_context;
read_context.for_peer_uuid = &request->dest_uuid();
if (s.ok()) {
read_context.for_peer_host =
&peer_to_send.peer_pb.last_known_addr().host();
read_context.for_peer_port =
peer_to_send.peer_pb.last_known_addr().port();
}
int64_t first_op_index = -1;
int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong();
for (int i = 0; i < request->ops_size(); i++) {
auto& msg = request->ops(i);
if (PREDICT_FALSE(msg.op_type() != PROXY_OP)) {
RET_RESPOND_ERROR_NOT_OK(Status::InvalidArgument(Substitute(
"proxy expected PROXY_OP but received opid {} of type {}",
OpIdToString(msg.id()),
OperationType_Name(msg.op_type()))));
}
if (i == 0) {
first_op_index = msg.id().index();
} else {
// TODO(mpercy): It would be nice not to require consecutive indexes in the batch.
// We should see if we can support it without a big perf penalty in IOPS.
if (PREDICT_FALSE(msg.id().index() != first_op_index + i)) {
RET_RESPOND_ERROR_NOT_OK(Status::InvalidArgument(Substitute(
"proxy requires consecutive indexes in batch, but received {} after index {}",
OpIdToString(msg.id()),
first_op_index + i - 1)));
}
}
}
// Now we know that all ops we are reconstituting are consecutive.
//
// Block until the required op is available in local log. This might timeout
// based on FLAGS_raft_log_cache_proxy_wait_time_ms in which case we return
// an error
OpId preceding_id;
if (request->ops_size() > 0) {
queue_->log_cache()->BlockingReadOps(
first_op_index - 1,
max_batch_size,
read_context,
FLAGS_raft_log_cache_proxy_wait_time_ms,
&messages,
&preceding_id);
}
if (request->ops_size() > 0 && messages.size() == 0) {
// We timed out and got nothing from the log cache. Send a heartbeat to
// the destination to prevent it from starting (pre) election
raft_proxy_num_requests_log_read_timeout_->Increment();
degraded_to_heartbeat = true;
}
// Reconstitute the proxied ops. We silently tolerate proxying a subset of
// the requested batch.
for (int i = 0; i < request->ops_size() && i < messages.size(); i++) {
// Ensure that the OpIds match. We don't expect a mismatch to ever
// happen, so we log an error locally before reponding to the caller.
if (!OpIdEquals(request->ops(i).id(), messages[i]->get()->id())) {
string extra_info;
if (i > 0) {
extra_info = Substitute(" (previously received OpId: $0)",
OpIdToString(messages[i-1]->get()->id()));
}
Status s = Status::IllegalState(Substitute(
"log cache returned non-consecutive OpId index for message $0 in request: "
"requested $1, received $2$3",
i,
OpIdToString(request->ops(i).id()),
OpIdToString(messages[i]->get()->id()),
extra_info));
LOG_WITH_PREFIX(ERROR) << s.ToString();
RET_RESPOND_ERROR_NOT_OK(s);
}
downstream_request.mutable_ops()->AddAllocated(messages[i]->get());
}
}
VLOG_WITH_PREFIX(3) << "Downstream proxy request: " << SecureShortDebugString(downstream_request);
// Asynchronously:
// Send the request to the remote.
//
// Find the address of the remote given our local config.
RaftPeerPB* next_peer_pb;
Status s = GetRaftConfigMember(&active_config, next_uuid, &next_peer_pb);
if (PREDICT_FALSE(!s.ok())) {
RET_RESPOND_ERROR_NOT_OK(s.CloneAndPrepend(Substitute(
"unable to proxy to peer {} because it is not in the active config: {}",
next_uuid,
SecureShortDebugString(active_config))));
}
if (!next_peer_pb->has_last_known_addr()) {
s = Status::IllegalState("no known address for peer", next_uuid);
LOG_WITH_PREFIX(ERROR) << s.ToString();
RET_RESPOND_ERROR_NOT_OK(s);
}
// TODO(mpercy): Cache this proxy object (although they are lightweight).
// We can use a PeerProxyPool, like we do when sending from the leader.
shared_ptr<PeerProxy> next_proxy;
RET_RESPOND_ERROR_NOT_OK(peer_proxy_factory_->NewProxy(*next_peer_pb, &next_proxy));
ConsensusResponsePB downstream_response;
rpc::RpcController controller;
controller.set_timeout(
MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
// Here, we turn an async API into a blocking one with a CountdownLatch.
// TODO(mpercy): Use an async approach instead.
CountDownLatch latch(/*count=*/1);
rpc::ResponseCallback callback = [&latch] { latch.CountDown(); };
next_proxy->UpdateAsync(&downstream_request, &downstream_response, &controller, callback);
latch.Wait();
if (PREDICT_FALSE(!controller.status().ok())) {
RET_RESPOND_ERROR_NOT_OK(controller.status().CloneAndPrepend(
Substitute("Error proxying request from $0 to $1",
SecureShortDebugString(local_peer_pb_),
SecureShortDebugString(*next_peer_pb))));
}
// Proxy the response back to the caller.
if (downstream_response.has_responder_uuid()) {
response->set_responder_uuid(downstream_response.responder_uuid());
}
if (downstream_response.has_responder_term()) {
response->set_responder_term(downstream_response.responder_term());
}
if (downstream_response.has_status()) {
*response->mutable_status() = downstream_response.status();
}
if (downstream_response.has_error()) {
*response->mutable_error() = downstream_response.error();
}
if (!degraded_to_heartbeat) {
raft_proxy_num_requests_success_->Increment();
}
context->RespondSuccess();
}