in ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java [166:240]
private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
final CompletableFuture<Void> future;
synchronized (server) {
final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER));
}
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
long callId = chunk0CallId.get();
// 1. leaderTerm < currentTerm will never come here
// 2. leaderTerm == currentTerm && callId == request.getCallId()
// means the snapshotRequest is staled with the same leader
// 3. leaderTerm > currentTerm means this is a new snapshot request from a new leader,
// chunk0CallId will be reset when a snapshot request with requestIndex == 0 is received .
if (callId > request.getServerRequest().getCallId() && currentTerm == leaderTerm) {
LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", getMemberId(), callId,
ServerStringUtils.toInstallSnapshotRequestString(request));
InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SNAPSHOT_EXPIRED);
return future.thenApply(dummy -> reply);
}
if (snapshotChunkRequest.getRequestIndex() == 0) {
nextChunkIndex.set(0);
chunk0CallId.set(request.getServerRequest().getCallId());
} else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get()
+ "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex());
}
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
return future.thenApply(dummy -> reply);
}
//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);
final int expectedChunkIndex = nextChunkIndex.getAndIncrement();
if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex()
+ " (the expected index is " + expectedChunkIndex + ")");
}
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncluded);
chunk0CallId.set(-1);
}
} finally {
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
}
}
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
}
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
return future.thenApply(dummy -> reply);
}