in src/kudu/consensus/raft_consensus.cc [1375:1691]
Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
Synchronizer log_synchronizer;
StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
// The ordering of the following operations is crucial, read on for details.
//
// The main requirements explained in more detail below are:
//
// 1) We must enqueue the prepares before we write to our local log.
// 2) If we were able to enqueue a prepare then we must be able to log it.
// 3) If we fail to enqueue a prepare, we must not attempt to enqueue any
// later-indexed prepare or apply.
//
// See below for detailed rationale.
//
// The steps are:
//
// 0 - Split/Dedup
//
// We split the operations into replicates and commits and make sure that we don't
// don't do anything on operations we've already received in a previous call.
// This essentially makes this method idempotent.
//
// 1 - We mark as many pending ops as committed as we can.
//
// We may have some pending ops that, according to the leader, are now
// committed. We Apply them early, because:
// - Soon (step 2) we may reject the call due to excessive memory pressure. One
// way to relieve the pressure is by flushing the MRS, and applying these
// ops may unblock an in-flight Flush().
// - The Apply and subsequent Prepares (step 2) can take place concurrently.
//
// 2 - We enqueue the Prepare of the ops.
//
// The actual prepares are enqueued in order but happen asynchronously so we don't
// have decoding/acquiring locks on the critical path.
//
// We need to do this now for a number of reasons:
// - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
// state machine so, were we to crash afterwards, having the prepares in-flight
// won't hurt.
// - Prepares depend on factors external to consensus (the op drivers and
// the TabletReplica) so if for some reason they cannot be enqueued we must know
// before we try write them to the WAL. Once enqueued, we assume that prepare will
// always succeed on a replica op (because the leader already prepared them
// successfully, and thus we know they are valid).
// - The prepares corresponding to every operation that was logged must be in-flight
// first. This because should we need to abort certain ops (say a new leader
// says they are not committed) we need to have those prepares in-flight so that
// the ops can be continued (in the abort path).
// - Failure to enqueue prepares is OK, we can continue and let the leader know that
// we only went so far. The leader will re-send the remaining messages.
// - Prepares represent new ops, and ops consume memory. Thus, if the
// overall memory pressure on the server is too high, we will reject the prepares.
//
// 3 - We enqueue the writes to the WAL.
//
// We enqueue writes to the WAL, but only the operations that were successfully
// enqueued for prepare (for the reasons introduced above). This means that even
// if a prepare fails to enqueue, if any of the previous prepares were successfully
// submitted they must be written to the WAL.
// If writing to the WAL fails, we're in an inconsistent state and we crash. In this
// case, no one will ever know of the ops we previously prepared so those are
// inconsequential.
//
// 4 - We mark the ops as committed.
//
// For each op which has been committed by the leader, we update the
// op state to reflect that. If the logging has already succeeded for that
// op, this will trigger the Apply phase. Otherwise, Apply will be triggered
// when the logging completes. In both cases the Apply phase executes asynchronously.
// This must, of course, happen after the prepares have been triggered as the same batch
// can both replicate/prepare and commit/apply an operation.
//
// Currently, if a prepare failed to enqueue we still trigger all applies for operations
// with an id lower than it (if we have them). This is important now as the leader will
// not re-send those commit messages. This will be moot when we move to the commit
// commitIndex way of doing things as we can simply ignore the applies as we know
// they will be triggered with the next successful batch.
//
// 5 - We wait for the writes to be durable.
//
// Before replying to the leader we wait for the writes to be durable. We then
// just update the last replicated watermark and respond.
//
// TODO - These failure scenarios need to be exercised in an unit
// test. Moreover we need to add more fault injection spots (well that
// and actually use the) for each of these steps.
// This will be done in a follow up patch.
TRACE("Updating replica for $0 ops", request->ops_size());
// The deduplicated request.
LeaderRequest deduped_req;
auto& messages = deduped_req.messages;
{
ThreadRestrictions::AssertWaitAllowed();
std::lock_guard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
}
deduped_req.leader_uuid = request->caller_uuid();
RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
if (response->status().has_error()) {
// We had an error, like an invalid term, we still fill the response.
FillConsensusResponseOKUnlocked(response);
return Status::OK();
}
// As soon as we decide to accept the message:
// * snooze the failure detector
// * prohibit voting for anyone for the minimum election timeout
// We are guaranteed to be acting as a FOLLOWER at this point by the above
// sanity check.
SnoozeFailureDetector();
WithholdVotes();
last_leader_communication_time_micros_ = GetMonoTimeMicros();
// Reset the 'failed_elections_since_stable_leader' metric now that we've
// accepted an update from the established leader. This is done in addition
// to the reset of the value in SetLeaderUuidUnlocked() because there is
// a potential race between resetting the failed elections count in
// SetLeaderUuidUnlocked() and incrementing after a failed election
// if another replica was elected leader in an election concurrent with
// the one called by this replica.
failed_elections_since_stable_leader_ = 0;
num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);
// We update the lag metrics here in addition to after appending to the queue so the
// metrics get updated even when the operation is rejected.
queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());
// 1 - Early commit pending (and committed) ops
// What should we commit?
// 1. As many pending ops as we can, except...
// 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
// 3. ...the leader's committed index is always our upper bound.
const int64_t early_apply_up_to = std::min({
pending_->GetLastPendingOpOpId().index(),
deduped_req.preceding_opid->index(),
request->committed_index()});
VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
<< ", Last pending opid index: "
<< pending_->GetLastPendingOpOpId().index()
<< ", preceding opid index: "
<< deduped_req.preceding_opid->index()
<< ", requested index: " << request->committed_index();
TRACE("Early marking committed up to index $0", early_apply_up_to);
CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to));
// 2 - Enqueue the prepares
TRACE("Triggering prepare for $0 ops", messages.size());
if (PREDICT_TRUE(!messages.empty())) {
// This request contains at least one message, and is likely to increase
// our memory pressure.
double capacity_pct;
if (process_memory::SoftLimitExceeded(&capacity_pct)) {
if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment();
string msg = StringPrintf(
"Soft memory limit exceeded (at %.2f%% of capacity)",
capacity_pct);
if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting consensus request: " << msg
<< THROTTLE_MSG;
} else {
KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting consensus request: " << msg
<< THROTTLE_MSG;
}
return Status::ServiceUnavailable(msg);
}
}
Status prepare_status;
auto iter = messages.begin();
while (iter != messages.end()) {
prepare_status = StartFollowerOpUnlocked(*iter);
if (PREDICT_FALSE(!prepare_status.ok())) {
break;
}
// TODO(dralves) Without leader leases this shouldn't be allowed to fail.
// Once we have that functionality we'll have to revisit this.
CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
++iter;
}
// If we stopped before reaching the end we failed to prepare some message(s) and need
// to perform cleanup, namely trimming deduped_req.messages to only contain the messages
// that were actually prepared, and deleting the other ones since we've taken ownership
// when we first deduped.
if (iter != messages.end()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute(
"Could not prepare op '$0' and following $1 ops. "
"Status for this op: $2",
(*iter)->get()->id().ShortDebugString(),
std::distance(iter, messages.end()) - 1,
prepare_status.ToString());
iter = messages.erase(iter, messages.end());
// If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
// else we can do. The leader will detect this and retry later.
if (messages.empty()) {
string msg = Substitute("Rejecting Update request from peer $0 for term $1. "
"Could not prepare a single op due to: $2",
request->caller_uuid(),
request->caller_term(),
prepare_status.ToString());
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
Status::IllegalState(msg));
FillConsensusResponseOKUnlocked(response);
return Status::OK();
}
}
// All ops that are going to be prepared were started, advance the safe timestamp.
// TODO(dralves) This is only correct because the queue only sets safe time when the request is
// an empty heartbeat. If we actually start setting this on a consensus request along with
// actual messages we need to be careful to ignore it if any of the messages fails to prepare.
if (request->has_safe_timestamp()) {
time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
}
OpId last_from_leader;
// 3 - Enqueue the writes.
// Now that we've triggered the prepares enqueue the operations to be written
// to the WAL.
if (PREDICT_TRUE(!messages.empty())) {
last_from_leader = messages.back()->get()->id();
// Trigger the log append asap, if fsync() is on this might take a while
// and we can't reply until this is done.
//
// Since we've prepared, we need to be able to append (or we risk trying to apply
// later something that wasn't logged). We crash if we can't.
CHECK_OK(queue_->AppendOperations(messages, sync_status_cb));
} else {
last_from_leader = *deduped_req.preceding_opid;
}
// 4 - Mark ops as committed
// Choose the last operation to be applied. This will either be 'committed_index', if
// no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
// the last successfully enqueued prepare, if some prepare failed to enqueue.
int64_t apply_up_to;
if (last_from_leader.index() < request->committed_index()) {
// we should never apply anything later than what we received in this request
apply_up_to = last_from_leader.index();
VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
<< request->committed_index() << " from the leader but only"
<< " marked up to " << apply_up_to << " as committed.";
} else {
apply_up_to = request->committed_index();
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
TRACE("Marking committed up to $0", apply_up_to);
CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to));
queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
// If any messages failed to be started locally, then we already have removed them
// from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
// we might apply.
last_received_cur_leader_ = last_from_leader;
// Fill the response with the current state. We will not mutate anymore state until
// we actually reply to the leader, we'll just wait for the messages to be durable.
FillConsensusResponseOKUnlocked(response);
}
// Release the lock while we wait for the log append to finish so that commits can go through.
// We'll re-acquire it before we update the state again.
// Update the last replicated op id
if (!messages.empty()) {
// 5 - We wait for the writes to be durable.
// Note that this is safe because dist consensus now only supports a single outstanding
// request at a time and this way we can allow commits to proceed while we wait.
TRACE("Waiting on the replicates to finish logging");
TRACE_EVENT0("consensus", "Wait for log");
Status s;
do {
s = log_synchronizer.WaitFor(
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
// If just waiting for our log append to finish lets snooze the timer.
// We don't want to fire leader election nor accept vote requests because
// we're still processing the Raft message from the leader,
// waiting on our own log.
if (s.IsTimedOut()) {
SnoozeFailureDetector();
WithholdVotes();
}
} while (s.IsTimedOut());
RETURN_NOT_OK(s);
TRACE("finished");
}
VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
<< ". Request: " << SecureShortDebugString(*request);
TRACE("UpdateReplicas() finished");
return Status::OK();
}