void Peer::SendNextRequest()

in src/kudu/consensus/consensus_peers.cc [209:351]


void Peer::SendNextRequest(bool even_if_queue_empty) {
  std::unique_lock<simple_spinlock> l(peer_lock_);
  if (PREDICT_FALSE(closed_)) {
    return;
  }

  // Only allow one request at a time.
  if (request_pending_) {
    return;
  }

  // For the first request sent by the peer, we send it even if the queue is empty,
  // which it will always appear to be for the first request, since this is the
  // negotiation round.
  if (!has_sent_first_request_) {
    even_if_queue_empty = true;
    has_sent_first_request_ = true;
  }

  // If our last request generated an error, and this is not a normal
  // heartbeat request, then don't send the "per-op" request. Instead,
  // we'll wait for the heartbeat.
  //
  // TODO(todd): we could consider looking at the number of consecutive failed
  // attempts, and instead of ignoring the signal, ask the heartbeater
  // to "expedite" the next heartbeat in order to achieve something like
  // exponential backoff after an error. As it is implemented today, any
  // transient error will result in a latency blip as long as the heartbeat
  // period.
  if (failed_attempts_ > 0 && !even_if_queue_empty) {
    return;
  }

  // The peer has no pending request nor is sending: send the request.
  bool needs_tablet_copy = false;

  // If this peer is not healthy (as indicated by failed_attempts_), then
  // degrade this peer to 'status-only' heartbeat request i.e donot read any ops
  // from log/log-cache to build the entire batch of ops to be sent to this
  // peer. This ensures that leader is not doing the expensive operation of
  // reading from log-cache to build the message for a peer that is not
  // reachable.
  bool read_ops = (failed_attempts_ <= 0);

  request_pending_ = true;

  // The next hop to route to to ship messages to this peer. This could be
  // different than the peer_uuid when proxy is enabled
  string next_hop_uuid;
  int64_t commit_index_before = request_.has_committed_index() ?
      request_.committed_index() : kMinimumOpIdIndex;
  Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), read_ops, &request_,
                                    &replicate_msg_refs_, &needs_tablet_copy,
                                    &next_hop_uuid);
  int64_t commit_index_after = request_.has_committed_index() ?
      request_.committed_index() : kMinimumOpIdIndex;

  if (PREDICT_FALSE(!s.ok())) {
    // Incrementing failed_attempts_ prevents a RequestForPeer error to continually
    // trigger an error on every actual write. The next attempt to RequestForPeer
    // will now be restricted to Heartbeats, but because this is hard failure
    // it will keep failing but only fail less often.
    // TODO -
    // Make empty heartbeats go through after a failure to send actual messages,
    // without changing the cursor at all on peer, but still maintaining authority on
    // it. Otherwise node keeps asking for votes, destabilizing cluster.
    failed_attempts_++;
    VLOG_WITH_PREFIX_UNLOCKED(1) << s.ToString();
    request_pending_ = false;
    return;
  }

#ifdef FB_DO_NOT_REMOVE
  if (PREDICT_FALSE(needs_tablet_copy)) {
    Status s = PrepareTabletCopyRequest();
    if (s.ok()) {
      controller_.Reset();
      request_pending_ = true;
      l.unlock();
      // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
      // that this object outlives the RPC.
      shared_ptr<Peer> s_this = shared_from_this();
      proxy_->StartTabletCopyAsync(&tc_request_, &tc_response_, &controller_,
                                   [s_this]() {
                                     s_this->ProcessTabletCopyResponse();
                                   });
    } else {
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request for peer: "
                                        << s.ToString();
      request_pending_ = false;
    }
    return;
  }
#endif

  request_.set_tablet_id(tablet_id_);
  request_.set_caller_uuid(leader_uuid_);
  request_.set_dest_uuid(peer_pb_.permanent_uuid());

  bool req_has_ops = request_.ops_size() > 0 || (commit_index_after > commit_index_before);
  // If the queue is empty, check if we were told to send a status-only
  // message, if not just return.
  if (PREDICT_FALSE(!req_has_ops && !even_if_queue_empty)) {
    request_pending_ = false;
    return;
  }

  if (req_has_ops) {
    // If we're actually sending ops there's no need to heartbeat for a while.
    heartbeater_->Snooze();
  }

  MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);


  VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending to peer " << peer_pb().permanent_uuid() << ": "
      << SecureShortDebugString(request_);
  controller_.Reset();

  l.unlock();
  // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
  // that this object outlives the RPC.
  shared_ptr<Peer> s_this = shared_from_this();

  // TODO: Refactor this code. Ideally all fields in 'request_' related to
  // proxying should be set inside PeerMessageQueue::RequestForPeer(). Move the
  // setting of 'proxy_hops_remaining' to PeerMessageQueue::RequestForPeer()
  if (next_hop_uuid != peer_pb().permanent_uuid()) {
    // If this is a proxy request, set the hops remaining value.
    request_.set_proxy_hops_remaining(FLAGS_raft_proxy_max_hops);
  }

  shared_ptr<PeerProxy> next_hop_proxy = peer_proxy_pool_->Get(next_hop_uuid);
  if (!next_hop_proxy) {
    LOG_WITH_PREFIX_UNLOCKED(FATAL) << "peer with uuid " << next_hop_uuid
                                    << " not found in peer proxy pool";
  }

  next_hop_proxy->UpdateAsync(&request_, &response_, &controller_,
                              [s_this]() {
                                s_this->ProcessResponse();
                              });
}