private CompletableFuture checkAndInstallSnapshot()

in ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java [166:240]


  private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(InstallSnapshotRequestProto request,
      RaftPeerId leaderId) throws IOException {
    final long currentTerm;
    final long leaderTerm = request.getLeaderTerm();
    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
    final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
    final long lastIncludedIndex = lastIncluded.getIndex();
    final CompletableFuture<Void> future;
    synchronized (server) {
      final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
      currentTerm = state.getCurrentTerm();
      if (!recognized) {
        return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
            currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER));
      }
      future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
      state.setLeader(leaderId, "installSnapshot");

      server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
      long callId = chunk0CallId.get();
      // 1. leaderTerm < currentTerm will never come here
      // 2. leaderTerm == currentTerm && callId == request.getCallId()
      //    means the snapshotRequest is staled with the same leader
      // 3. leaderTerm > currentTerm means this is a new snapshot request from a new leader,
      //    chunk0CallId will be reset when a snapshot request with requestIndex == 0 is received .
      if (callId > request.getServerRequest().getCallId() && currentTerm == leaderTerm) {
        LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", getMemberId(), callId,
            ServerStringUtils.toInstallSnapshotRequestString(request));
        InstallSnapshotReplyProto reply =  toInstallSnapshotReplyProto(leaderId, getMemberId(),
            currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SNAPSHOT_EXPIRED);
        return future.thenApply(dummy -> reply);
      }
      if (snapshotChunkRequest.getRequestIndex() == 0) {
        nextChunkIndex.set(0);
        chunk0CallId.set(request.getServerRequest().getCallId());
      } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
        throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get()
                + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex());
      }
      try {
        // Check and append the snapshot chunk. We simply put this in lock
        // considering a follower peer requiring a snapshot installation does not
        // have a lot of requests
        if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
          nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
          final InstallSnapshotReplyProto reply =  toInstallSnapshotReplyProto(leaderId, getMemberId(),
              currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
          return future.thenApply(dummy -> reply);
        }

        //TODO: We should only update State with installed snapshot once the request is done.
        state.installSnapshot(request);

        final int expectedChunkIndex = nextChunkIndex.getAndIncrement();
        if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) {
          throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex()
              + " (the expected index is " + expectedChunkIndex + ")");
        }
        // update the committed index
        // re-load the state machine if this is the last chunk
        if (snapshotChunkRequest.getDone()) {
          state.reloadStateMachine(lastIncluded);
          chunk0CallId.set(-1);
        }
      } finally {
        server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
      }
    }
    if (snapshotChunkRequest.getDone()) {
      LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
    }
    final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
        currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
    return future.thenApply(dummy -> reply);
  }