in ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java [242:280]
private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException {
final long committed = raftLog.getLastCommittedIndex();
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) {
final long nextIndex = applied + 1;
final ReferenceCountedObject<LogEntryProto> next = raftLog.retainLog(nextIndex);
if (next == null) {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
this, nextIndex, state);
break;
}
try {
final LogEntryProto entry = next.get();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(entry));
} else {
LOG.debug("{}: applying nextIndex={}", this, nextIndex);
}
final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
Preconditions.assertTrue(incremented == nextIndex);
if (f != null) {
CompletableFuture<Message> exceptionHandledFuture = f.exceptionally(ex -> {
LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex,
LogProtoUtils.toLogEntryString(entry), ex);
return null;
});
applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null);
f.thenAccept(m -> notifyAppliedIndex(incremented));
} else {
notifyAppliedIndex(incremented);
}
} finally {
next.release();
}
}
return applyLogFutures;
}