private CompletableFuture notifyStateMachineToInstallSnapshot()

in ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java [242:386]


  private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot(
      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
    final long currentTerm;
    final long leaderTerm = request.getLeaderTerm();
    final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
        request.getNotification().getFirstAvailableTermIndex());
    final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
    final CompletableFuture<Void> future;
    synchronized (server) {
      final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
      currentTerm = state.getCurrentTerm();
      if (!recognized) {
        return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
            currentTerm, InstallSnapshotResult.NOT_LEADER));
      }
      future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
      state.setLeader(leaderId, "installSnapshot");
      server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);

      if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) {
        LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex);
        // Check if snapshot index is already at par or ahead of the first
        // available log index of the Leader.
        final long snapshotIndex = state.getLog().getSnapshotIndex();
        if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex &&
            firstAvailableLogIndex > INVALID_LOG_INDEX) {
          // State Machine has already installed the snapshot. Return the
          // latest snapshot index to the Leader.

          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
          LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
          final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
          return future.thenApply(dummy -> reply);
        }

        final RaftPeerProto leaderProto;
        if (!request.hasLastRaftConfigurationLogEntryProto()) {
          leaderProto = null;
        } else {
          leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList()
              .stream()
              .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId))
              .findFirst()
              .orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId
                  + " not found from the last configuration LogEntryProto, request = " + request));
        }

        // For the cases where RaftConf is empty on newly started peer with empty peer list,
        // we retrieve leader info from installSnapShotRequestProto.
        final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null?
            server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto));
        // This is the first installSnapshot notify request for this term and
        // index. Notify the state machine to install the snapshot.
        LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
            getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
        // If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the
        // downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB
        // checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that
        // once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot
        // installation in progress.

        // There is another appendLog thread appending raft entries, which returns inconsistency entries with
        // nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side
        // is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread,
        // otherwise leader could get this follower's latest nextIndex from appendEntries instead of after
        // acknowledging the SNAPSHOT_INSTALLED.
        server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex)
            .whenComplete((reply, exception) -> {
              if (exception != null) {
                LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
                    getMemberId(), exception.getMessage());
                inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
                return;
              }

              if (reply != null) {
                LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
                    getMemberId(), reply.getIndex());
                installedSnapshotTermIndex.set(reply);
              } else {
                isSnapshotNull.set(true);
                if (LOG.isDebugEnabled()) {
                  LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
                }
              }
            });

        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId());
        }
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId());
        }
      }

      final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex();
      Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex
              && inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX,
          "inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s",
          getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);

      // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
      if (isSnapshotNull.compareAndSet(true, false)) {
        LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
        inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
        server.getStateMachine().event().notifySnapshotInstalled(
            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
        final InstallSnapshotReplyProto reply =  toInstallSnapshotReplyProto(leaderId, getMemberId(),
            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
        return future.thenApply(dummy -> reply);
      }

      // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
      // installedSnapshotIndex to (0,-1).
      final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex
          .getAndSet(INVALID_TERM_INDEX);
      if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
        server.getStateMachine().pause();
        state.reloadStateMachine(latestInstalledSnapshotTermIndex);
        LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(),
            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex);
        inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
        final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex();
        server.getStateMachine().event().notifySnapshotInstalled(
            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
        installedIndex.set(latestInstalledIndex);
        final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
        return future.thenApply(dummy -> reply);
      }

      // Otherwise, Snapshot installation is in progress.
      if (LOG.isDebugEnabled()) {
        LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
            InstallSnapshotResult.IN_PROGRESS);
      }
      final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
          currentTerm, InstallSnapshotResult.IN_PROGRESS);
      return future.thenApply(dummy -> reply);
    }
  }