in ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java [242:386]
private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
final CompletableFuture<Void> future;
synchronized (server) {
final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER));
}
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
final long snapshotIndex = state.getLog().getSnapshotIndex();
if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > INVALID_LOG_INDEX) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return future.thenApply(dummy -> reply);
}
final RaftPeerProto leaderProto;
if (!request.hasLastRaftConfigurationLogEntryProto()) {
leaderProto = null;
} else {
leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList()
.stream()
.filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId
+ " not found from the last configuration LogEntryProto, request = " + request));
}
// For the cases where RaftConf is empty on newly started peer with empty peer list,
// we retrieve leader info from installSnapShotRequestProto.
final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null?
server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto));
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
// If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the
// downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB
// checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that
// once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot
// installation in progress.
// There is another appendLog thread appending raft entries, which returns inconsistency entries with
// nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side
// is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread,
// otherwise leader could get this follower's latest nextIndex from appendEntries instead of after
// acknowledging the SNAPSHOT_INSTALLED.
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
getMemberId(), exception.getMessage());
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
return;
}
if (reply != null) {
LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
getMemberId(), reply.getIndex());
installedSnapshotTermIndex.set(reply);
} else {
isSnapshotNull.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
}
}
});
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId());
}
}
final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex();
Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex
&& inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX,
"inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s",
getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
// If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
return future.thenApply(dummy -> reply);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
// installedSnapshotIndex to (0,-1).
final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex
.getAndSet(INVALID_TERM_INDEX);
if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
server.getStateMachine().pause();
state.reloadStateMachine(latestInstalledSnapshotTermIndex);
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex();
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
return future.thenApply(dummy -> reply);
}
// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
return future.thenApply(dummy -> reply);
}
}