bool PeerMessageQueue::ResponseFromPeer()

in src/kudu/consensus/consensus_queue.cc [1690:1916]


bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
                                        const ConsensusResponsePB& response) {
  DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
      << response.InitializationErrorString() << ". Response: " << SecureShortDebugString(response);
#ifdef FB_DO_NOT_REMOVE
  CHECK(!response.has_error());
#endif

  bool send_more_immediately = false;
  boost::optional<int64_t> updated_commit_index;
  Mode mode_copy;
  {
    std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);

    // TODO(mpercy): Handle response from proxy on behalf of another peer.
    // For now, we'll try to ignore proxying here, but we may need to
    // eventually handle that here for better health status and error logging.

    TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
    if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding "
          "peer response. Response: " << SecureShortDebugString(response);
      return send_more_immediately;
    }

    // Sanity checks.
    // Some of these can be eventually removed, but they are handy for now.
    DCHECK(response.status().IsInitialized())
        << "Error: Uninitialized: " << response.InitializationErrorString()
        << ". Response: "<< SecureShortDebugString(response);
    // TODO(mpercy): Include uuid in error messages as well.
    DCHECK(response.has_responder_uuid() && !response.responder_uuid().empty())
        << "Got response from peer with empty UUID";

#ifdef FB_DO_NOT_REMOVE
    DCHECK(!response.has_error()); // Application-level errors should be handled elsewhere.
#endif

    DCHECK(response.has_status()); // Responses should always have a status.
    // The status must always have a last received op id and a last committed index.
    const ConsensusStatusPB& status = response.status();
    DCHECK(status.has_last_received());
    DCHECK(status.has_last_received_current_leader());
    DCHECK(status.has_last_committed_idx());

    // Take a snapshot of the previously-recorded peer state.
    const TrackedPeer prev_peer_state = *peer;

    // Update the peer's last exchange status based on the response.
    // In this case, if there is a log matching property (LMP) mismatch, we
    // want to immediately send another request as we attempt to sync the log
    // offset between the local leader and the remote peer.
    UpdateExchangeStatus(peer, prev_peer_state, response, &send_more_immediately);

    // If the reported last-received op for the replica is in our local log,
    // then resume sending entries from that point onward. Otherwise, resume
    // after the last op they received from us. If we've never successfully
    // sent them anything, start after the last-committed op in their log, which
    // is guaranteed by the Raft protocol to be a valid op.

    bool peer_has_prefix_of_log = IsOpInLog(status.last_received());
    if (peer_has_prefix_of_log) {
      // If the latest thing in their log is in our log, we are in sync.
      peer->last_received = status.last_received();
      peer->next_index = peer->last_received.index() + 1;

      // Check if the peer is a NON_VOTER candidate ready for promotion.
      PromoteIfNeeded(peer, prev_peer_state, status);

      TransferLeadershipIfNeeded(*peer, status);
    } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
      // Their log may have diverged from ours, however we are in the process
      // of replicating our ops to them, so continue doing so. Eventually, we
      // will cause the divergent entry in their log to be overwritten.
      peer->last_received = status.last_received_current_leader();
      peer->next_index = peer->last_received.index() + 1;

    } else {
      // The peer is divergent and they have not (successfully) received
      // anything from us yet. Start sending from their last committed index.
      // This logic differs from the Raft spec slightly because instead of
      // stepping back one-by-one from the end until we no longer have an LMP
      // error, we jump back to the last committed op indicated by the peer with
      // the hope that doing so will result in a faster catch-up process.
      DCHECK_GE(peer->last_known_committed_index, 0);
      peer->next_index = peer->last_known_committed_index + 1;
      LOG_WITH_PREFIX_UNLOCKED(INFO)
          << "Peer " << peer_uuid << " log is divergent from this leader: "
          << "its last log entry " << OpIdToString(status.last_received()) << " is not in "
          << "this leader's log and it has not received anything from this leader yet. "
          << "Falling back to committed index " << peer->last_known_committed_index;
    }

    if (peer->last_exchange_status != PeerStatus::OK) {
      // In this case, 'send_more_immediately' has already been set by
      // UpdateExchangeStatus() to true in the case of an LMP mismatch, false
      // otherwise.
      return send_more_immediately;
    }

    if (response.has_responder_term()) {
      // The peer must have responded with a term that is greater than or equal to
      // the last known term for that peer.
      peer->CheckMonotonicTerms(response.responder_term());

      // If the responder didn't send an error back that must mean that it has
      // a term that is the same or lower than ours.
      CHECK_LE(response.responder_term(), queue_state_.current_term);
    }

    if (PREDICT_FALSE(VLOG_IS_ON(2))) {
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Received Response from Peer (" << peer->ToString() << "). "
          << "Response: " << SecureShortDebugString(response);
    }

    mode_copy = queue_state_.mode;

    // If we're the leader, we can compute the new watermarks based on the progress
    // of our followers.
    // NOTE: it's possible this node might have lost its leadership (and the notification
    // is just pending behind the lock we're holding), but any future leader will observe
    // the same watermarks and make the same advancement, so this is safe.
    int64_t old_all_replicated_index = 0;
    int64_t new_all_replicated_index = 0;

    if (mode_copy == LEADER) {
      // Advance the majority replicated index.
      if (!FLAGS_enable_flexi_raft) {
        AdvanceQueueWatermark("majority_replicated",
                              &queue_state_.majority_replicated_index,
                              /*replicated_before=*/ prev_peer_state.last_received,
                              /*replicated_after=*/ peer->last_received,
                              /*num_peers_required=*/ queue_state_.majority_size_,
                              VOTER_REPLICAS,
                              peer);
      } else if (peer->last_received.index() > queue_state_.majority_replicated_index ||
          peer->last_exchange_status != PeerStatus::OK) {
        // This method is expensive. The 'watermark' can change only if this
        // peer's last received index is higer than the current
        // majority_replicated_index. We also call this method when the
        // last_exhange_status of the peer indicates an error. This is because
        // 'majority_replicated_index' can go down. It sould be safe to
        // completely skip calling this method when 'last_exhange_status' is an
        // error, but we do not want to introduce a behavior change at this
        // point. Check AdvanceQueueWatermark() for more comments
        AdvanceMajorityReplicatedWatermarkFlexiRaft(
            &queue_state_.majority_replicated_index,
            /*replicated_before=*/ prev_peer_state.last_received,
            /*replicated_after=*/ peer->last_received,
            peer);
      }

      old_all_replicated_index = queue_state_.all_replicated_index;

      // Advance the all replicated index.
      AdvanceQueueWatermark("all_replicated",
                            &queue_state_.all_replicated_index,
                            /*replicated_before=*/ prev_peer_state.last_received,
                            /*replicated_after=*/ peer->last_received,
                            /*num_peers_required=*/ peers_map_.size(),
                            ALL_REPLICAS,
                            peer);

      new_all_replicated_index = queue_state_.all_replicated_index;


      // If the majority-replicated index is in our current term,
      // and it is above our current committed index, then
      // we can advance the committed index.
      //
      // It would seem that the "it is above our current committed index"
      // check is redundant (and could be a CHECK), but in fact the
      // majority-replicated index can currently go down, since we don't
      // consider peers whose last contact was an error in the watermark
      // calculation. See the TODO in AdvanceQueueWatermark() for more details.
      int64_t commit_index_before = queue_state_.committed_index;
      if (queue_state_.first_index_in_current_term != boost::none &&
          queue_state_.majority_replicated_index >= queue_state_.first_index_in_current_term &&
          queue_state_.majority_replicated_index > queue_state_.committed_index) {
        queue_state_.committed_index = queue_state_.majority_replicated_index;
      } else {
        VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
                                     << "first index in current leader term: "
                                     << queue_state_.first_index_in_current_term << ". "
                                     << "current majority_replicated_index: "
                                     << queue_state_.majority_replicated_index << ", "
                                     << "current committed_index: "
                                     << queue_state_.committed_index;

      }

      // Once the commit index has been updated, go ahead and update the
      // region_durable_index
      AdvanceQueueRegionDurableIndex();

      // Only notify observers if the commit index actually changed.
      if (queue_state_.committed_index != commit_index_before) {
        DCHECK_GT(queue_state_.committed_index, commit_index_before);
        updated_commit_index = queue_state_.committed_index;
        VLOG_WITH_PREFIX_UNLOCKED(2) << "Commit index advanced from "
                                     << commit_index_before << " to "
                                     << *updated_commit_index;
      }
    }

    // If the peer's committed index is lower than our own, or if our log has
    // the next request for the peer, set 'send_more_immediately' to true.
    send_more_immediately = peer->last_known_committed_index < queue_state_.committed_index ||
                            log_cache_.HasOpBeenWritten(peer->next_index);

    // Evict ops from log_cache only if:
    // 1. This is not a leader node OR
    // 2. 'all_replicated_index' has changed after processing this response
    if (mode_copy != LEADER ||
        (old_all_replicated_index != new_all_replicated_index)) {
      log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
    }

    UpdateMetricsUnlocked();
  }

  if (mode_copy == LEADER && updated_commit_index != boost::none) {
    NotifyObserversOfCommitIndexChange(*updated_commit_index);
  }

  return send_more_immediately;
}