Status RaftConsensus::UpdateReplica()

in src/kudu/consensus/raft_consensus.cc [1375:1691]


Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
                                    ConsensusResponsePB* response) {
  TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  Synchronizer log_synchronizer;
  StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();


  // The ordering of the following operations is crucial, read on for details.
  //
  // The main requirements explained in more detail below are:
  //
  //   1) We must enqueue the prepares before we write to our local log.
  //   2) If we were able to enqueue a prepare then we must be able to log it.
  //   3) If we fail to enqueue a prepare, we must not attempt to enqueue any
  //      later-indexed prepare or apply.
  //
  // See below for detailed rationale.
  //
  // The steps are:
  //
  // 0 - Split/Dedup
  //
  // We split the operations into replicates and commits and make sure that we don't
  // don't do anything on operations we've already received in a previous call.
  // This essentially makes this method idempotent.
  //
  // 1 - We mark as many pending ops as committed as we can.
  //
  // We may have some pending ops that, according to the leader, are now
  // committed. We Apply them early, because:
  // - Soon (step 2) we may reject the call due to excessive memory pressure. One
  //   way to relieve the pressure is by flushing the MRS, and applying these
  //   ops may unblock an in-flight Flush().
  // - The Apply and subsequent Prepares (step 2) can take place concurrently.
  //
  // 2 - We enqueue the Prepare of the ops.
  //
  // The actual prepares are enqueued in order but happen asynchronously so we don't
  // have decoding/acquiring locks on the critical path.
  //
  // We need to do this now for a number of reasons:
  // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
  //   state machine so, were we to crash afterwards, having the prepares in-flight
  //   won't hurt.
  // - Prepares depend on factors external to consensus (the op drivers and
  //   the TabletReplica) so if for some reason they cannot be enqueued we must know
  //   before we try write them to the WAL. Once enqueued, we assume that prepare will
  //   always succeed on a replica op (because the leader already prepared them
  //   successfully, and thus we know they are valid).
  // - The prepares corresponding to every operation that was logged must be in-flight
  //   first. This because should we need to abort certain ops (say a new leader
  //   says they are not committed) we need to have those prepares in-flight so that
  //   the ops can be continued (in the abort path).
  // - Failure to enqueue prepares is OK, we can continue and let the leader know that
  //   we only went so far. The leader will re-send the remaining messages.
  // - Prepares represent new ops, and ops consume memory. Thus, if the
  //   overall memory pressure on the server is too high, we will reject the prepares.
  //
  // 3 - We enqueue the writes to the WAL.
  //
  // We enqueue writes to the WAL, but only the operations that were successfully
  // enqueued for prepare (for the reasons introduced above). This means that even
  // if a prepare fails to enqueue, if any of the previous prepares were successfully
  // submitted they must be written to the WAL.
  // If writing to the WAL fails, we're in an inconsistent state and we crash. In this
  // case, no one will ever know of the ops we previously prepared so those are
  // inconsequential.
  //
  // 4 - We mark the ops as committed.
  //
  // For each op which has been committed by the leader, we update the
  // op state to reflect that. If the logging has already succeeded for that
  // op, this will trigger the Apply phase. Otherwise, Apply will be triggered
  // when the logging completes. In both cases the Apply phase executes asynchronously.
  // This must, of course, happen after the prepares have been triggered as the same batch
  // can both replicate/prepare and commit/apply an operation.
  //
  // Currently, if a prepare failed to enqueue we still trigger all applies for operations
  // with an id lower than it (if we have them). This is important now as the leader will
  // not re-send those commit messages. This will be moot when we move to the commit
  // commitIndex way of doing things as we can simply ignore the applies as we know
  // they will be triggered with the next successful batch.
  //
  // 5 - We wait for the writes to be durable.
  //
  // Before replying to the leader we wait for the writes to be durable. We then
  // just update the last replicated watermark and respond.
  //
  // TODO - These failure scenarios need to be exercised in an unit
  //        test. Moreover we need to add more fault injection spots (well that
  //        and actually use the) for each of these steps.
  //        This will be done in a follow up patch.
  TRACE("Updating replica for $0 ops", request->ops_size());

  // The deduplicated request.
  LeaderRequest deduped_req;
  auto& messages = deduped_req.messages;
  {
    ThreadRestrictions::AssertWaitAllowed();
    std::lock_guard l(lock_);
    RETURN_NOT_OK(CheckRunningUnlocked());
    if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
    }

    deduped_req.leader_uuid = request->caller_uuid();

    RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
    if (response->status().has_error()) {
      // We had an error, like an invalid term, we still fill the response.
      FillConsensusResponseOKUnlocked(response);
      return Status::OK();
    }

    // As soon as we decide to accept the message:
    //   * snooze the failure detector
    //   * prohibit voting for anyone for the minimum election timeout
    // We are guaranteed to be acting as a FOLLOWER at this point by the above
    // sanity check.
    SnoozeFailureDetector();
    WithholdVotes();

    last_leader_communication_time_micros_ = GetMonoTimeMicros();

    // Reset the 'failed_elections_since_stable_leader' metric now that we've
    // accepted an update from the established leader. This is done in addition
    // to the reset of the value in SetLeaderUuidUnlocked() because there is
    // a potential race between resetting the failed elections count in
    // SetLeaderUuidUnlocked() and incrementing after a failed election
    // if another replica was elected leader in an election concurrent with
    // the one called by this replica.
    failed_elections_since_stable_leader_ = 0;
    num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);

    // We update the lag metrics here in addition to after appending to the queue so the
    // metrics get updated even when the operation is rejected.
    queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());

    // 1 - Early commit pending (and committed) ops

    // What should we commit?
    // 1. As many pending ops as we can, except...
    // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
    // 3. ...the leader's committed index is always our upper bound.
    const int64_t early_apply_up_to = std::min({
        pending_->GetLastPendingOpOpId().index(),
        deduped_req.preceding_opid->index(),
        request->committed_index()});

    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
                                 << ", Last pending opid index: "
                                 << pending_->GetLastPendingOpOpId().index()
                                 << ", preceding opid index: "
                                 << deduped_req.preceding_opid->index()
                                 << ", requested index: " << request->committed_index();
    TRACE("Early marking committed up to index $0", early_apply_up_to);
    CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to));

    // 2 - Enqueue the prepares

    TRACE("Triggering prepare for $0 ops", messages.size());

    if (PREDICT_TRUE(!messages.empty())) {
      // This request contains at least one message, and is likely to increase
      // our memory pressure.
      double capacity_pct;
      if (process_memory::SoftLimitExceeded(&capacity_pct)) {
        if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment();
        string msg = StringPrintf(
            "Soft memory limit exceeded (at %.2f%% of capacity)",
            capacity_pct);
        if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
          KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting consensus request: " << msg
                                        << THROTTLE_MSG;
        } else {
          KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting consensus request: " << msg
                                     << THROTTLE_MSG;
        }
        return Status::ServiceUnavailable(msg);
      }
    }

    Status prepare_status;
    auto iter = messages.begin();
    while (iter != messages.end()) {
      prepare_status = StartFollowerOpUnlocked(*iter);
      if (PREDICT_FALSE(!prepare_status.ok())) {
        break;
      }
      // TODO(dralves) Without leader leases this shouldn't be allowed to fail.
      // Once we have that functionality we'll have to revisit this.
      CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
      ++iter;
    }

    // If we stopped before reaching the end we failed to prepare some message(s) and need
    // to perform cleanup, namely trimming deduped_req.messages to only contain the messages
    // that were actually prepared, and deleting the other ones since we've taken ownership
    // when we first deduped.
    if (iter != messages.end()) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute(
          "Could not prepare op '$0' and following $1 ops. "
          "Status for this op: $2",
          (*iter)->get()->id().ShortDebugString(),
          std::distance(iter, messages.end()) - 1,
          prepare_status.ToString());
      iter = messages.erase(iter, messages.end());

      // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
      // else we can do. The leader will detect this and retry later.
      if (messages.empty()) {
        string msg = Substitute("Rejecting Update request from peer $0 for term $1. "
                                "Could not prepare a single op due to: $2",
                                request->caller_uuid(),
                                request->caller_term(),
                                prepare_status.ToString());
        LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
        FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
                                   Status::IllegalState(msg));
        FillConsensusResponseOKUnlocked(response);
        return Status::OK();
      }
    }

    // All ops that are going to be prepared were started, advance the safe timestamp.
    // TODO(dralves) This is only correct because the queue only sets safe time when the request is
    // an empty heartbeat. If we actually start setting this on a consensus request along with
    // actual messages we need to be careful to ignore it if any of the messages fails to prepare.
    if (request->has_safe_timestamp()) {
      time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
    }

    OpId last_from_leader;
    // 3 - Enqueue the writes.
    // Now that we've triggered the prepares enqueue the operations to be written
    // to the WAL.
    if (PREDICT_TRUE(!messages.empty())) {
      last_from_leader = messages.back()->get()->id();
      // Trigger the log append asap, if fsync() is on this might take a while
      // and we can't reply until this is done.
      //
      // Since we've prepared, we need to be able to append (or we risk trying to apply
      // later something that wasn't logged). We crash if we can't.
      CHECK_OK(queue_->AppendOperations(messages, sync_status_cb));
    } else {
      last_from_leader = *deduped_req.preceding_opid;
    }

    // 4 - Mark ops as committed

    // Choose the last operation to be applied. This will either be 'committed_index', if
    // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
    // the last successfully enqueued prepare, if some prepare failed to enqueue.
    int64_t apply_up_to;
    if (last_from_leader.index() < request->committed_index()) {
      // we should never apply anything later than what we received in this request
      apply_up_to = last_from_leader.index();

      VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
          << request->committed_index() << " from the leader but only"
          << " marked up to " << apply_up_to << " as committed.";
    } else {
      apply_up_to = request->committed_index();
    }

    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
    TRACE("Marking committed up to $0", apply_up_to);
    CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to));
    queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());

    // If any messages failed to be started locally, then we already have removed them
    // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
    // we might apply.
    last_received_cur_leader_ = last_from_leader;

    // Fill the response with the current state. We will not mutate anymore state until
    // we actually reply to the leader, we'll just wait for the messages to be durable.
    FillConsensusResponseOKUnlocked(response);
  }
  // Release the lock while we wait for the log append to finish so that commits can go through.
  // We'll re-acquire it before we update the state again.

  // Update the last replicated op id
  if (!messages.empty()) {

    // 5 - We wait for the writes to be durable.

    // Note that this is safe because dist consensus now only supports a single outstanding
    // request at a time and this way we can allow commits to proceed while we wait.
    TRACE("Waiting on the replicates to finish logging");
    TRACE_EVENT0("consensus", "Wait for log");
    Status s;
    do {
      s = log_synchronizer.WaitFor(
          MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
      // If just waiting for our log append to finish lets snooze the timer.
      // We don't want to fire leader election nor accept vote requests because
      // we're still processing the Raft message from the leader,
      // waiting on our own log.
      if (s.IsTimedOut()) {
        SnoozeFailureDetector();
        WithholdVotes();
      }
    } while (s.IsTimedOut());
    RETURN_NOT_OK(s);

    TRACE("finished");
  }

  VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
                      << ". Request: " << SecureShortDebugString(*request);

  TRACE("UpdateReplicas() finished");
  return Status::OK();
}