void RaftConsensus::HandleProxyRequest()

in src/kudu/consensus/raft_consensus.cc [4046:4316]


void RaftConsensus::HandleProxyRequest(const ConsensusRequestPB* request,
                                       ConsensusResponsePB* response,
                                       rpc::RpcContext* context) {
  MonoDelta wal_wait_timeout = MonoDelta::FromMilliseconds(FLAGS_raft_log_cache_proxy_wait_time_ms);
  MonoTime wal_wait_deadline = MonoTime::Now() + wal_wait_timeout;

  // TODO(mpercy): Remove this config lookup when refactoring DRT to return a
  // RaftPeerPB, which will prevent a validation race.
  RaftConfigPB active_config;
  {
    // Snapshot the active Raft config so we know how to route proxied messages.
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    RET_RESPOND_ERROR_NOT_OK(CheckRunningUnlocked());
    active_config = cmeta_->ActiveConfig();
  }

  raft_proxy_num_requests_received_->Increment();

  // Initial implementation:
  //
  // Synchronously:
  // 1. Validate that the request is addressed to the local node via 'proxy_dest_uuid'.
  // 2. Reconstitute each message from the local cache.
  //
  // Asynchronously:
  // 4. Deliver the reconstituted request directly to the remote (async).
  // 5. Proxy the response from the remote back to the caller.

  // Validate the request.
  if (request->proxy_dest_uuid() != peer_uuid()) {
    Status s = Status::InvalidArgument(Substitute("Wrong proxy destination UUID requested. "
                                                  "Local UUID: $1. Requested UUID: $2",
                                                  peer_uuid(), request->proxy_dest_uuid()));
    LOG_WITH_PREFIX(WARNING) << s.ToString() << ": from " << context->requestor_string()
                 << ": " << SecureShortDebugString(*request);
    SetupErrorAndRespond(s, ServerErrorPB::WRONG_SERVER_UUID, response, context);
    return;
  }
  if (request->dest_uuid() == peer_uuid()) {
    LOG_WITH_PREFIX(WARNING) << "dest_uuid and proxy_dest_uuid are the same: "
                             << request->proxy_dest_uuid() << ": "
                             << request->ShortDebugString();
    context->RespondFailure(Status::InvalidArgument("proxy and desination must be different"));
    return;
  }

  if (request->proxy_hops_remaining() < 1) {
    LOG_WITH_PREFIX(WARNING) << "Proxy hops remaining exhausted (possible routing loop?) "
                             << "in request to peer "
                             << request->proxy_dest_uuid() << ": "
                             << request->ShortDebugString();
    raft_proxy_num_requests_hops_remaining_exhausted_->Increment();
    context->RespondFailure(Status::Incomplete("proxy hops remaining exhausted",
                                               "possible routing loop"));
    return;
  }

  // Construct the downstream request; copy the relevant fields from the
  // proxied request.
  ConsensusRequestPB downstream_request;
  auto prevent_ops_deletion = MakeScopedCleanup([&]() {
    // Prevent double-deletion of these requests.
    downstream_request.mutable_ops()->ExtractSubrange(
      /*start=*/ 0, /*num=*/ downstream_request.ops_size(), /*elements=*/ nullptr);
  });

  downstream_request.set_dest_uuid(request->dest_uuid());
  downstream_request.set_tablet_id(request->tablet_id());
  downstream_request.set_caller_uuid(request->caller_uuid());
  downstream_request.set_caller_term(request->caller_term());

  // Decrement hops remaining.
  downstream_request.set_proxy_hops_remaining(request->proxy_hops_remaining() - 1);

  if (request->has_preceding_id()) {
    *downstream_request.mutable_preceding_id() = request->preceding_id();
  }
  if (request->has_committed_index()) {
    downstream_request.set_committed_index(request->committed_index());
  }
  if (request->has_all_replicated_index()) {
    downstream_request.set_all_replicated_index(request->all_replicated_index());
  }
  if (request->has_safe_timestamp()) {
    downstream_request.set_safe_timestamp(request->safe_timestamp());
  }
  if (request->has_last_idx_appended_to_leader()) {
    downstream_request.set_last_idx_appended_to_leader(request->last_idx_appended_to_leader());
  }
  if(request->has_region_durable_index()) {
    downstream_request.set_region_durable_index(request->region_durable_index());
  }

  downstream_request.set_proxy_caller_uuid(peer_uuid());

  string next_uuid = request->dest_uuid();
  if (FLAGS_raft_enable_multi_hop_proxy_routing) {
    Status s = routing_table_container_->NextHop(
        peer_uuid(), request->dest_uuid(), &next_uuid);
    if (PREDICT_FALSE(!s.ok())) {
      raft_proxy_num_requests_unknown_dest_->Increment();
    }
    RET_RESPOND_ERROR_NOT_OK(s);
  }

  bool degraded_to_heartbeat = false;
  vector<ReplicateRefPtr> messages;
  messages.clear();

  if (request->dest_uuid() != next_uuid) {
    // Multi-hop proxy request.
    downstream_request.set_proxy_dest_uuid(next_uuid);
    // Forward the existing PROXY_OP ops.
    for (int i = 0; i < request->ops_size(); i++) {
      *downstream_request.add_ops() = request->ops(i);
    }
    prevent_ops_deletion.cancel(); // The ops we copy here are not pre-allocated
  } else {

    // Reconstitute proxied events from the local cache.
    // If the cache does not have all events, we retry up until the specified
    // retry timeout.
    // TODO(mpercy): Switch this from polling to event-triggered.
    PeerMessageQueue::TrackedPeer peer_to_send;
    Status s = queue_->FindPeer(request->dest_uuid(), &peer_to_send);
    ReadContext read_context;
    read_context.for_peer_uuid = &request->dest_uuid();
    if (s.ok()) {
      read_context.for_peer_host =
        &peer_to_send.peer_pb.last_known_addr().host();
      read_context.for_peer_port =
        peer_to_send.peer_pb.last_known_addr().port();
    }

    int64_t first_op_index = -1;
    int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong();

    for (int i = 0; i < request->ops_size(); i++) {
      auto& msg = request->ops(i);
      if (PREDICT_FALSE(msg.op_type() != PROXY_OP)) {
        RET_RESPOND_ERROR_NOT_OK(Status::InvalidArgument(Substitute(
            "proxy expected PROXY_OP but received opid {} of type {}",
            OpIdToString(msg.id()),
            OperationType_Name(msg.op_type()))));
      }
      if (i == 0) {
        first_op_index = msg.id().index();
      } else {
        // TODO(mpercy): It would be nice not to require consecutive indexes in the batch.
        // We should see if we can support it without a big perf penalty in IOPS.
        if (PREDICT_FALSE(msg.id().index() != first_op_index + i)) {
          RET_RESPOND_ERROR_NOT_OK(Status::InvalidArgument(Substitute(
              "proxy requires consecutive indexes in batch, but received {} after index {}",
              OpIdToString(msg.id()),
              first_op_index + i - 1)));
        }
      }
    }

    // Now we know that all ops we are reconstituting are consecutive.
    //
    // Block until the required op is available in local log. This might timeout
    // based on FLAGS_raft_log_cache_proxy_wait_time_ms in which case we return
    // an error
    OpId preceding_id;
    if (request->ops_size() > 0) {
      queue_->log_cache()->BlockingReadOps(
          first_op_index - 1,
          max_batch_size,
          read_context,
          FLAGS_raft_log_cache_proxy_wait_time_ms,
          &messages,
          &preceding_id);
    }

    if (request->ops_size() > 0 && messages.size() == 0) {
      // We timed out and got nothing from the log cache. Send a heartbeat to
      // the destination to prevent it from starting (pre) election
      raft_proxy_num_requests_log_read_timeout_->Increment();
      degraded_to_heartbeat = true;
    }

    // Reconstitute the proxied ops. We silently tolerate proxying a subset of
    // the requested batch.
    for (int i = 0; i < request->ops_size() && i < messages.size(); i++) {
      // Ensure that the OpIds match. We don't expect a mismatch to ever
      // happen, so we log an error locally before reponding to the caller.
      if (!OpIdEquals(request->ops(i).id(), messages[i]->get()->id())) {
        string extra_info;
        if (i > 0) {
          extra_info = Substitute(" (previously received OpId: $0)",
                                  OpIdToString(messages[i-1]->get()->id()));
        }
        Status s = Status::IllegalState(Substitute(
            "log cache returned non-consecutive OpId index for message $0 in request: "
            "requested $1, received $2$3",
            i,
            OpIdToString(request->ops(i).id()),
            OpIdToString(messages[i]->get()->id()),
            extra_info));
        LOG_WITH_PREFIX(ERROR) << s.ToString();
        RET_RESPOND_ERROR_NOT_OK(s);
      }
      downstream_request.mutable_ops()->AddAllocated(messages[i]->get());
    }
  }

  VLOG_WITH_PREFIX(3) << "Downstream proxy request: " << SecureShortDebugString(downstream_request);

  // Asynchronously:
  // Send the request to the remote.
  //
  // Find the address of the remote given our local config.
  RaftPeerPB* next_peer_pb;
  Status s = GetRaftConfigMember(&active_config, next_uuid, &next_peer_pb);
  if (PREDICT_FALSE(!s.ok())) {
    RET_RESPOND_ERROR_NOT_OK(s.CloneAndPrepend(Substitute(
        "unable to proxy to peer {} because it is not in the active config: {}",
        next_uuid,
        SecureShortDebugString(active_config))));
  }
  if (!next_peer_pb->has_last_known_addr()) {
    s = Status::IllegalState("no known address for peer", next_uuid);
    LOG_WITH_PREFIX(ERROR) << s.ToString();
    RET_RESPOND_ERROR_NOT_OK(s);
  }

  // TODO(mpercy): Cache this proxy object (although they are lightweight).
  // We can use a PeerProxyPool, like we do when sending from the leader.
  shared_ptr<PeerProxy> next_proxy;
  RET_RESPOND_ERROR_NOT_OK(peer_proxy_factory_->NewProxy(*next_peer_pb, &next_proxy));

  ConsensusResponsePB downstream_response;
  rpc::RpcController controller;
  controller.set_timeout(
      MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));

  // Here, we turn an async API into a blocking one with a CountdownLatch.
  // TODO(mpercy): Use an async approach instead.
  CountDownLatch latch(/*count=*/1);
  rpc::ResponseCallback callback = [&latch] { latch.CountDown(); };
  next_proxy->UpdateAsync(&downstream_request, &downstream_response, &controller, callback);
  latch.Wait();
  if (PREDICT_FALSE(!controller.status().ok())) {
    RET_RESPOND_ERROR_NOT_OK(controller.status().CloneAndPrepend(
        Substitute("Error proxying request from $0 to $1",
                   SecureShortDebugString(local_peer_pb_),
                   SecureShortDebugString(*next_peer_pb))));
  }

  // Proxy the response back to the caller.
  if (downstream_response.has_responder_uuid()) {
    response->set_responder_uuid(downstream_response.responder_uuid());
  }
  if (downstream_response.has_responder_term()) {
    response->set_responder_term(downstream_response.responder_term());
  }
  if (downstream_response.has_status()) {
    *response->mutable_status() = downstream_response.status();
  }
  if (downstream_response.has_error()) {
    *response->mutable_error() = downstream_response.error();
  }

  if (!degraded_to_heartbeat) {
    raft_proxy_num_requests_success_->Increment();
  }

  context->RespondSuccess();
}