in ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java [102:171]
public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException {
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
// create a unique temporary directory
final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
LOG.info("Installing snapshot:{}, to tmp dir:{}",
ServerStringUtils.toInstallSnapshotRequestString(request), tmpDir);
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
throw new IOException("There exists snapshot file "
+ pi.getFiles() + " in " + selfId
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
}
final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk));
FileUtils.createDirectoriesDeleteExistingNonDirectory(tmpSnapshotFile.getParentFile());
try (FileChannel out = open(chunk, tmpSnapshotFile)) {
final ByteBuffer data = chunk.getData().asReadOnlyByteBuffer();
digester.update(data.duplicate());
int written = 0;
for(; data.remaining() > 0; ) {
written += out.write(data);
}
Preconditions.assertSame(chunk.getData().size(), written, "written");
}
// rename the temp snapshot file if this is the last chunk. also verify
// the md5 digest and create the md5 meta-file.
if (chunk.getDone()) {
final MD5Hash expectedDigest =
new MD5Hash(chunk.getFileDigest().toByteArray());
// calculate the checksum of the snapshot file and compare it with the
// file digest in the request
final MD5Hash digest = new MD5Hash(digester.digest());
if (!digest.equals(expectedDigest)) {
LOG.warn("The snapshot md5 digest {} does not match expected {}",
digest, expectedDigest);
// rename the temp snapshot file to .corrupt
String renameMessage;
try {
final File corruptedFile = FileUtils.move(tmpSnapshotFile, CORRUPT + StringUtils.currentDateTime());
renameMessage = "Renamed temporary snapshot file " + tmpSnapshotFile + " to " + corruptedFile;
} catch (IOException e) {
renameMessage = "Tried but failed to rename temporary snapshot file " + tmpSnapshotFile
+ " to a " + CORRUPT + " file";
LOG.warn(renameMessage, e);
renameMessage += ": " + e;
}
throw new CorruptedFileException(tmpSnapshotFile,
"MD5 mismatch for snapshot-" + lastIncludedIndex + " installation. " + renameMessage);
} else {
MD5FileUtil.saveMD5File(tmpSnapshotFile, digest);
}
}
}
if (snapshotChunkRequest.getDone()) {
rename(tmpDir, snapshotDir.get());
}
}