in server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java [3201:3496]
private void doSnapshotShard(SnapshotShardContext context) {
if (isReadOnly()) {
context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}
final Store store = context.store();
final ShardId shardId = store.shardId();
final SnapshotId snapshotId = context.snapshotId();
final IndexShardSnapshotStatus snapshotStatus = context.status();
snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot");
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
final BlobContainer shardContainer = shardContainer(context.indexId(), shardId);
logger.debug("[{}][{}] snapshot to [{}][{}][{}] ...", shardId, snapshotId, metadata.name(), context.indexId(), generation);
final Set<String> blobs;
if (generation == null) {
snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes");
try {
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
} else {
blobs = Collections.singleton(SNAPSHOT_INDEX_PREFIX + generation);
}
snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs");
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
context.indexId(),
shardId.id(),
blobs,
shardContainer,
generation
);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
ShardGeneration fileListGeneration = tuple.v2();
if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
throw new IndexShardSnapshotFailedException(
shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting"
);
}
// First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(context.stateIdentifier())
.map(id -> {
for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
if (id.equals(snapshotFileSet.shardStateIdentifier())) {
return snapshotFileSet.indexFiles();
}
}
return null;
})
.orElse(null);
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
int filesInShardMetadataCount = 0;
long filesInShardMetadataSize = 0;
if (store.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
indexCommitPointFiles = Collections.emptyList();
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try (Releasable ignored = context.withCommitRef()) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
final IndexCommit snapshotIndexCommit = context.indexCommit();
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
}
for (String fileName : fileNames) {
ensureNotAborted(shardId, snapshotId, snapshotStatus, fileName);
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetadata md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = snapshots.findPhysicalIndexFile(md);
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileSize += md.length();
indexTotalNumberOfFiles++;
if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md,
chunkSize()
);
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
} else {
assert assertFileContentsMatchHash(snapshotStatus, snapshotFileInfo, store);
filesInShardMetadataCount += 1;
filesInShardMetadataSize += md.length();
}
} else {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
indexCommitPointFiles.add(existingFileInfo);
}
}
} else {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesFromSegmentInfos) {
indexTotalNumberOfFiles++;
indexTotalFileSize += fileInfo.length();
}
indexCommitPointFiles = filesFromSegmentInfos;
}
snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot");
snapshotStatus.moveToStarted(
startTime,
indexIncrementalFileCount,
indexTotalNumberOfFiles,
indexIncrementalSize,
indexTotalFileSize
);
final ShardGeneration indexGeneration;
final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion());
final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion());
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot(
new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier())
);
final Runnable afterWriteSnapBlob;
if (writeShardGens) {
// When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
// for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never
// reference a generation that has not had all its files fully upload.
indexGeneration = ShardGeneration.newGeneration();
try {
final Map<String, String> serializationParams = Collections.singletonMap(
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation");
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
updatedBlobStoreIndexShardSnapshots,
shardContainer,
indexGeneration.getGenerationUUID(),
compress,
serializationParams
);
snapshotStatus.addProcessedFiles(filesInShardMetadataCount, filesInShardMetadataSize);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
shardId,
"Failed to write shard level snapshot metadata for ["
+ snapshotId
+ "] to ["
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration.getGenerationUUID())
+ "]",
e
);
}
afterWriteSnapBlob = () -> {};
} else {
// When not using shard generations we can only write the index-${N} blob after all other work for this shard has
// completed.
// Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
final long newGen = Long.parseLong(fileListGeneration.getGenerationUUID()) + 1;
indexGeneration = new ShardGeneration(newGen);
// Delete all previous index-N blobs
final List<String> blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).toList();
assert blobsToDelete.stream()
.mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
.max()
.orElse(-1L) < Long.parseLong(indexGeneration.toString())
: "Tried to delete an index-N blob newer than the current generation ["
+ indexGeneration
+ "] when deleting index-N blobs "
+ blobsToDelete;
final var finalFilesInShardMetadataCount = filesInShardMetadataCount;
final var finalFilesInShardMetadataSize = filesInShardMetadataSize;
afterWriteSnapBlob = () -> {
try {
final Map<String, String> serializationParams = Collections.singletonMap(
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file");
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
shardId,
"Failed to finalize snapshot creation ["
+ snapshotId
+ "] with shard index ["
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration.getGenerationUUID())
+ "]",
e
);
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
snapshotStatus.updateStatusDescription("no shard generations: deleting blobs");
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
logger.warn(
() -> format("[%s][%s] failed to delete old index-N blobs during finalization", snapshotId, shardId),
e
);
}
};
}
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
snapshotStatus.updateStatusDescription("all files uploaded: finalizing");
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();
// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
final BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot = new BlobStoreIndexShardSnapshot(
snapshotId.getName(),
indexCommitPointFiles,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
lastSnapshotStatus.getIncrementalFileCount(),
lastSnapshotStatus.getIncrementalSize()
);
// Once we start writing the shard level snapshot file, no cleanup will be performed because it is possible that
// written files are referenced by another concurrent process.
fileToCleanUp.set(List.of());
try {
final String snapshotUUID = snapshotId.getUUID();
final Map<String, String> serializationParams = Collections.singletonMap(
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file");
INDEX_SHARD_SNAPSHOT_FORMAT.write(
blobStoreIndexShardSnapshot,
shardContainer,
snapshotUUID,
compress,
serializationParams
);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
afterWriteSnapBlob.run();
final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(
indexGeneration,
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.updateStatusDescription("all files uploaded: done");
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
}, e -> {
try {
snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e);
shardContainer.deleteBlobsIgnoringIfNotExists(
OperationPurpose.SNAPSHOT_DATA,
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
);
} catch (Exception innerException) {
e.addSuppressed(innerException);
}
context.onFailure(e);
}));
if (indexIncrementalFileCount == 0 || filesToSnapshot.isEmpty()) {
allFilesUploadedListener.onResponse(Collections.emptyList());
return;
}
snapshotFiles(context, filesToSnapshot, allFilesUploadedListener);
} catch (Exception e) {
context.onFailure(e);
}
}