private CompletableFuture appendEntriesAsync()

in ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java [1604:1692]


  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
      TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
    final AppendEntriesRequestProto proto = requestRef.get();
    final List<LogEntryProto> entries = proto.getEntriesList();
    final boolean isHeartbeat = entries.isEmpty();
    logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* "
        + toAppendEntriesRequestString(proto, stateMachine::toStateMachineLogEntryString));

    final long leaderTerm = proto.getLeaderTerm();
    final long currentTerm;
    final long followerCommit = state.getLog().getLastCommittedIndex();
    final Optional<FollowerState> followerState;
    final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
    final CompletableFuture<Void> future;
    synchronized (this) {
      // Check life cycle state again to avoid the PAUSING/PAUSED state.
      assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
      currentTerm = state.getCurrentTerm();
      final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm);
      if (!recognized) {
        return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
            leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
            AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
      }
      try {
        future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
      } catch (IOException e) {
        return JavaUtils.completeExceptionally(e);
      }
      state.setLeader(leaderId, "appendEntries");

      if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
        role.startFollowerState(this, Op.APPEND_ENTRIES);
      }
      followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);

      // Check that the append entries are not inconsistent. There are 3
      // scenarios which can result in inconsistency:
      //      1. There is a snapshot installation in progress
      //      2. There is an overlap between the snapshot index and the entries
      //      3. There is a gap between the local log and the entries
      // In any of these scenarios, we should return an INCONSISTENCY reply
      // back to leader so that the leader can update this follower's next index.
      final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries);
      if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
        final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
            leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex,
            AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
        LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
        followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
        return future.thenApply(dummy -> reply);
      }

      state.updateConfiguration(entries);
    }
    future.join();
    final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
        : appendLog(requestRef.delegate(entries));

    proto.getCommitInfosList().forEach(commitInfoCache::update);

    CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
    if (!isHeartbeat) {
      final long installedIndex = snapshotInstallationHandler.getInstalledIndex();
      if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
        LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex);
        stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
      }
    }

    final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
    final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
    return appendLog.whenCompleteAsync((r, t) -> {
      followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
      timer.stop();
    }, getServerExecutor()).thenApply(v -> {
      final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false);
      if (updated) {
        updateCommitInfoCache();
      }
      final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1;
      final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(),
          currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(),
          nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
      logAppendEntries(isHeartbeat, () -> getMemberId()
          + ": appendEntries* reply " + toAppendEntriesReplyString(reply));
      return reply;
    });
  }