private CompletableFuture applyLog()

in ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java [242:280]


  private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException {
    final long committed = raftLog.getLastCommittedIndex();
    for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) {
      final long nextIndex = applied + 1;
      final ReferenceCountedObject<LogEntryProto> next = raftLog.retainLog(nextIndex);
      if (next == null) {
        LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
            this, nextIndex, state);
        break;
      }

      try {
        final LogEntryProto entry = next.get();
        if (LOG.isTraceEnabled()) {
          LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(entry));
        } else {
          LOG.debug("{}: applying nextIndex={}", this, nextIndex);
        }

        final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
        final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
        Preconditions.assertTrue(incremented == nextIndex);
        if (f != null) {
          CompletableFuture<Message> exceptionHandledFuture = f.exceptionally(ex -> {
            LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex,
                    LogProtoUtils.toLogEntryString(entry), ex);
            return null;
          });
          applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null);
          f.thenAccept(m -> notifyAppliedIndex(incremented));
        } else {
          notifyAppliedIndex(incremented);
        }
      } finally {
        next.release();
      }
    }
    return applyLogFutures;
  }