Status PeerMessageQueue::RequestForPeer()

in src/kudu/consensus/consensus_queue.cc [696:910]


Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                        bool read_ops,
                                        ConsensusRequestPB* request,
                                        vector<ReplicateRefPtr>* msg_refs,
                                        bool* needs_tablet_copy,
                                        std::string* next_hop_uuid) {
  // Maintain a thread-safe copy of necessary members.
  OpId preceding_id;
  int64_t current_term;
  TrackedPeer peer_copy;
  MonoDelta unreachable_time;
  {
    std::lock_guard<simple_spinlock> lock(queue_lock_);
    DCHECK_EQ(queue_state_.state, kQueueOpen);
    DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());

    TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
      return Status::NotFound(Substitute("peer $0 is no longer tracked or "
                                         "queue is not in leader mode", uuid));
    }
    peer_copy = *peer;

    // Clear the requests without deleting the entries, as they may be in use by other peers.
    request->mutable_ops()->ExtractSubrange(0, request->ops_size(), nullptr);

    // This is initialized to the queue's last appended op but gets set to the id of the
    // log entry preceding the first one in 'messages' if messages are found for the peer.
    preceding_id = queue_state_.last_appended;
    current_term = queue_state_.current_term;

    request->set_committed_index(queue_state_.committed_index);
    request->set_all_replicated_index(queue_state_.all_replicated_index);
    request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
    request->set_caller_term(current_term);
    request->set_region_durable_index(queue_state_.region_durable_index);
    unreachable_time = MonoTime::Now() - peer_copy.last_communication_time;

    RETURN_NOT_OK(routing_table_container_->NextHop(
          local_peer_pb_.permanent_uuid(), uuid, next_hop_uuid));

    if (*next_hop_uuid != uuid) {
      // If proxy_peer is not healthy, then route directly to the destination
      // TODO: Multi hop proxy support needs better failure and health checks
      // for proxy peer. The current method of detecting unhealthy proxy peer
      // works only on the leader. One solution could be for the leader to
      // periodically exchange the health report of all peers as part of
      // UpdateReplica() call
      TrackedPeer* proxy_peer = FindPtrOrNull(peers_map_, *next_hop_uuid);
      if (proxy_peer == nullptr ||
          HasProxyPeerFailedUnlocked(proxy_peer, peer)) {
        *next_hop_uuid = uuid;
      }
    }
  }

  // Always trigger a health status update check at the end of this function.
  bool wal_catchup_progress = false;
  bool wal_catchup_failure = false;
  // Preventing the overhead of this as we need to take consensus queue lock
  // again
  SCOPED_CLEANUP({
      if (!FLAGS_update_peer_health_status) {
        return;
      }
      std::lock_guard<simple_spinlock> lock(queue_lock_);
      TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
      if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
        VLOG(1) << LogPrefixUnlocked() << "peer " << uuid
                << " is no longer tracked or queue is not in leader mode";
        return;
      }
      if (wal_catchup_progress) peer->wal_catchup_possible = true;
      if (wal_catchup_failure) peer->wal_catchup_possible = false;
      UpdatePeerHealthUnlocked(peer);
    });

  if (peer_copy.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
    VLOG(3) << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
    *needs_tablet_copy = true;
    return Status::OK();
  }
  *needs_tablet_copy = false;

  // If the next hop != the destination, we are sending these messages via a proxy.
  bool route_via_proxy = *next_hop_uuid != uuid;
  if (route_via_proxy) {
    // Set proxy uuid
    request->set_proxy_dest_uuid(*next_hop_uuid);
  } else {
    // Clear proxy uuid to ensure that this message is not rejected by the
    // destination
    request->clear_proxy_dest_uuid();
  }

  // If we've never communicated with the peer, we don't know what messages to
  // send, so we'll send a status-only request. Otherwise, we grab requests
  // from the log starting at the last_received point.
  // If the caller has explicitly indicated to not read the ops (as indicated by
  // 'read_ops'), then we skip reading ops from log-cache/log. The caller
  // ususally does this when the leader detects that a peer is unhealthy and
  // hence needs to be degraded to a 'status-only' request
  if (peer_copy.last_exchange_status != PeerStatus::NEW && read_ops) {

    // The batch of messages to send to the peer.
    vector<ReplicateRefPtr> messages;
    int max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSize();

    ReadContext read_context;
    read_context.for_peer_uuid = &uuid;
    read_context.for_peer_host = &peer_copy.peer_pb.last_known_addr().host();
    read_context.for_peer_port = peer_copy.peer_pb.last_known_addr().port();
    read_context.route_via_proxy = route_via_proxy;

    // We try to get the follower's next_index from our log.
    Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
                                  max_batch_size,
                                  read_context,
                                  &messages,
                                  &preceding_id);
    if (PREDICT_FALSE(!s.ok())) {
      // It's normal to have a NotFound() here if a follower falls behind where
      // the leader has GCed its logs. The follower replica will hang around
      // for a while until it's evicted.
      if (PREDICT_TRUE(s.IsNotFound())) {
        KLOG_EVERY_N_SECS_THROTTLER(INFO, 60, *peer_copy.status_log_throttler, "logs_gced")
            << LogPrefixUnlocked()
            << Substitute("The logs necessary to catch up peer $0 have been "
                          "garbage collected. The follower will never be able "
                          "to catch up ($1)", uuid, s.ToString());
        wal_catchup_failure = true;
        return s;
      }
      if (s.IsIncomplete()) {
        // IsIncomplete() means that we tried to read beyond the head of the log
        // (in the future). See KUDU-1078.
        LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
                                        << "while preparing peer request: "
                                        << s.ToString() << ". Destination peer: "
                                        << peer_copy.ToString();
        return s;
      }
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
                                      << s.ToString() << ". Destination peer: "
                                      << peer_copy.ToString();
    }

    // Since we were able to read ops through the log cache, we know that
    // catchup is possible.
    wal_catchup_progress = true;

    // We use AddAllocated rather than copy, because we pin the log cache at the
    // "all replicated" point. At some point we may want to allow partially loading
    // (and not pinning) earlier messages. At that point we'll need to do something
    // smarter here, like copy or ref-count.
    if (!route_via_proxy) {
      for (const ReplicateRefPtr& msg : messages) {
        request->mutable_ops()->AddAllocated(msg->get());
      }
      msg_refs->swap(messages);
    } else {
      vector<ReplicateRefPtr> proxy_ops;
      for (const ReplicateRefPtr& msg : messages) {
        ReplicateRefPtr proxy_op = make_scoped_refptr_replicate(new ReplicateMsg);
        *proxy_op->get()->mutable_id() = msg->get()->id();
        proxy_op->get()->set_timestamp(msg->get()->timestamp());
        proxy_op->get()->set_op_type(PROXY_OP);
        request->mutable_ops()->AddAllocated(proxy_op->get());
        proxy_ops.emplace_back(std::move(proxy_op));
      }
      msg_refs->swap(proxy_ops);
    }
  }

  DCHECK(preceding_id.IsInitialized());
  request->mutable_preceding_id()->CopyFrom(preceding_id);

  // If we are sending ops to the follower, but the batch doesn't reach the current
  // committed index, we can consider the follower lagging, and it's worth
  // logging this fact periodically.
  if (request->ops_size() > 0) {
    int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
    if (last_op_sent < request->committed_index()) {
      // Will use metrics to cover this and alarm on it, otherwise it can overwhelm
      // logs
      VLOG_WITH_PREFIX_UNLOCKED(2)
          << "Peer " << uuid << " is lagging by at least "
          << (request->committed_index() - last_op_sent)
          << " ops behind the committed index " << THROTTLE_MSG;
    }
  // If we're not sending ops to the follower, set the safe time on the request.
  // TODO(dralves) When we have leader leases, send this all the time.
  } else {
    if (PREDICT_TRUE(FLAGS_safe_time_advancement_without_writes)) {
      request->set_safe_timestamp(time_manager_->GetSafeTime().value());
    } else {
      KLOG_EVERY_N_SECS(WARNING, 300) << "Safe time advancement without writes is disabled. "
            "Snapshot reads on non-leader replicas may stall if there are no writes in progress.";
    }
  }

  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
    if (request->ops_size() > 0) {
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid
          << ". Size: " << request->ops_size()
          << ". From: " << SecureShortDebugString(request->ops(0).id()) << ". To: "
          << SecureShortDebugString(request->ops(request->ops_size() - 1).id());
    } else {
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending status only request to Peer: " << uuid
          << ": " << SecureDebugString(*request);
    }
  }

  return Status::OK();
}