in ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java [1604:1692]
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto proto = requestRef.get();
final List<LogEntryProto> entries = proto.getEntriesList();
final boolean isHeartbeat = entries.isEmpty();
logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* "
+ toAppendEntriesRequestString(proto, stateMachine::toStateMachineLogEntryString));
final long leaderTerm = proto.getLeaderTerm();
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
final CompletableFuture<Void> future;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
currentTerm = state.getCurrentTerm();
final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm);
if (!recognized) {
return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
}
try {
future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
state.setLeader(leaderId, "appendEntries");
if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
role.startFollowerState(this, Op.APPEND_ENTRIES);
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
// Check that the append entries are not inconsistent. There are 3
// scenarios which can result in inconsistency:
// 1. There is a snapshot installation in progress
// 2. There is an overlap between the snapshot index and the entries
// 3. There is a gap between the local log and the entries
// In any of these scenarios, we should return an INCONSISTENCY reply
// back to leader so that the leader can update this follower's next index.
final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries);
if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex,
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return future.thenApply(dummy -> reply);
}
state.updateConfiguration(entries);
}
future.join();
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
: appendLog(requestRef.delegate(entries));
proto.getCommitInfosList().forEach(commitInfoCache::update);
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
if (!isHeartbeat) {
final long installedIndex = snapshotInstallationHandler.getInstalledIndex();
if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex);
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
}
}
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
return appendLog.whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false);
if (updated) {
updateCommitInfoCache();
}
final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1;
final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(),
currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(),
nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
logAppendEntries(isHeartbeat, () -> getMemberId()
+ ": appendEntries* reply " + toAppendEntriesReplyString(reply));
return reply;
});
}