private void doSnapshotShard()

in server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java [3201:3496]


    private void doSnapshotShard(SnapshotShardContext context) {
        if (isReadOnly()) {
            context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
            return;
        }
        final Store store = context.store();
        final ShardId shardId = store.shardId();
        final SnapshotId snapshotId = context.snapshotId();
        final IndexShardSnapshotStatus snapshotStatus = context.status();
        snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot");
        final long startTime = threadPool.absoluteTimeInMillis();
        try {
            final ShardGeneration generation = snapshotStatus.generation();
            final BlobContainer shardContainer = shardContainer(context.indexId(), shardId);
            logger.debug("[{}][{}] snapshot to [{}][{}][{}] ...", shardId, snapshotId, metadata.name(), context.indexId(), generation);
            final Set<String> blobs;
            if (generation == null) {
                snapshotStatus.ensureNotAborted();
                snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes");
                try {
                    blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
                } catch (IOException e) {
                    throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
                }
            } else {
                blobs = Collections.singleton(SNAPSHOT_INDEX_PREFIX + generation);
            }

            snapshotStatus.ensureNotAborted();
            snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs");
            Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
                context.indexId(),
                shardId.id(),
                blobs,
                shardContainer,
                generation
            );
            BlobStoreIndexShardSnapshots snapshots = tuple.v1();
            ShardGeneration fileListGeneration = tuple.v2();

            if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
                throw new IndexShardSnapshotFailedException(
                    shardId,
                    "Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting"
                );
            }
            // First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
            final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(context.stateIdentifier())
                .map(id -> {
                    for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
                        if (id.equals(snapshotFileSet.shardStateIdentifier())) {
                            return snapshotFileSet.indexFiles();
                        }
                    }
                    return null;
                })
                .orElse(null);

            final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
            int indexIncrementalFileCount = 0;
            int indexTotalNumberOfFiles = 0;
            long indexIncrementalSize = 0;
            long indexTotalFileSize = 0;
            final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
            int filesInShardMetadataCount = 0;
            long filesInShardMetadataSize = 0;

            if (store.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
                indexCommitPointFiles = Collections.emptyList();
            } else if (filesFromSegmentInfos == null) {
                // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
                // in the commit with files already in the repository
                indexCommitPointFiles = new ArrayList<>();
                final Collection<String> fileNames;
                final Store.MetadataSnapshot metadataFromStore;
                try (Releasable ignored = context.withCommitRef()) {
                    // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
                    try {
                        final IndexCommit snapshotIndexCommit = context.indexCommit();
                        logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
                        metadataFromStore = store.getMetadata(snapshotIndexCommit);
                        fileNames = snapshotIndexCommit.getFileNames();
                    } catch (IOException e) {
                        throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
                    }
                }
                for (String fileName : fileNames) {
                    ensureNotAborted(shardId, snapshotId, snapshotStatus, fileName);

                    logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
                    final StoreFileMetadata md = metadataFromStore.get(fileName);
                    BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = snapshots.findPhysicalIndexFile(md);

                    // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
                    // directly in the shard level metadata in this case
                    final boolean needsWrite = md.hashEqualsContents() == false;
                    indexTotalFileSize += md.length();
                    indexTotalNumberOfFiles++;

                    if (existingFileInfo == null) {
                        indexIncrementalFileCount++;
                        indexIncrementalSize += md.length();
                        // create a new FileInfo
                        BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
                            (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
                            md,
                            chunkSize()
                        );
                        indexCommitPointFiles.add(snapshotFileInfo);
                        if (needsWrite) {
                            filesToSnapshot.add(snapshotFileInfo);
                        } else {
                            assert assertFileContentsMatchHash(snapshotStatus, snapshotFileInfo, store);
                            filesInShardMetadataCount += 1;
                            filesInShardMetadataSize += md.length();
                        }
                    } else {
                        // a commit point file with the same name, size and checksum was already copied to repository
                        // we will reuse it for this snapshot
                        indexCommitPointFiles.add(existingFileInfo);
                    }
                }
            } else {
                for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesFromSegmentInfos) {
                    indexTotalNumberOfFiles++;
                    indexTotalFileSize += fileInfo.length();
                }
                indexCommitPointFiles = filesFromSegmentInfos;
            }

            snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot");
            snapshotStatus.moveToStarted(
                startTime,
                indexIncrementalFileCount,
                indexTotalNumberOfFiles,
                indexIncrementalSize,
                indexTotalFileSize
            );

            final ShardGeneration indexGeneration;
            final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion());
            final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion());
            // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
            final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot(
                new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier())
            );
            final Runnable afterWriteSnapBlob;
            if (writeShardGens) {
                // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
                // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never
                // reference a generation that has not had all its files fully upload.
                indexGeneration = ShardGeneration.newGeneration();
                try {
                    final Map<String, String> serializationParams = Collections.singletonMap(
                        BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
                        Boolean.toString(writeFileInfoWriterUUID)
                    );
                    snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation");
                    INDEX_SHARD_SNAPSHOTS_FORMAT.write(
                        updatedBlobStoreIndexShardSnapshots,
                        shardContainer,
                        indexGeneration.getGenerationUUID(),
                        compress,
                        serializationParams
                    );
                    snapshotStatus.addProcessedFiles(filesInShardMetadataCount, filesInShardMetadataSize);
                } catch (IOException e) {
                    throw new IndexShardSnapshotFailedException(
                        shardId,
                        "Failed to write shard level snapshot metadata for ["
                            + snapshotId
                            + "] to ["
                            + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration.getGenerationUUID())
                            + "]",
                        e
                    );
                }
                afterWriteSnapBlob = () -> {};
            } else {
                // When not using shard generations we can only write the index-${N} blob after all other work for this shard has
                // completed.
                // Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
                final long newGen = Long.parseLong(fileListGeneration.getGenerationUUID()) + 1;
                indexGeneration = new ShardGeneration(newGen);
                // Delete all previous index-N blobs
                final List<String> blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).toList();
                assert blobsToDelete.stream()
                    .mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
                    .max()
                    .orElse(-1L) < Long.parseLong(indexGeneration.toString())
                    : "Tried to delete an index-N blob newer than the current generation ["
                        + indexGeneration
                        + "] when deleting index-N blobs "
                        + blobsToDelete;
                final var finalFilesInShardMetadataCount = filesInShardMetadataCount;
                final var finalFilesInShardMetadataSize = filesInShardMetadataSize;

                afterWriteSnapBlob = () -> {
                    try {
                        final Map<String, String> serializationParams = Collections.singletonMap(
                            BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
                            Boolean.toString(writeFileInfoWriterUUID)
                        );
                        snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file");
                        writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
                    } catch (IOException e) {
                        throw new IndexShardSnapshotFailedException(
                            shardId,
                            "Failed to finalize snapshot creation ["
                                + snapshotId
                                + "] with shard index ["
                                + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration.getGenerationUUID())
                                + "]",
                            e
                        );
                    }
                    snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
                    try {
                        snapshotStatus.updateStatusDescription("no shard generations: deleting blobs");
                        deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
                    } catch (IOException e) {
                        logger.warn(
                            () -> format("[%s][%s] failed to delete old index-N blobs during finalization", snapshotId, shardId),
                            e
                        );
                    }
                };
            }

            // filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
            final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
            final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
                snapshotStatus.updateStatusDescription("all files uploaded: finalizing");
                final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();

                // now create and write the commit point
                logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
                final BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot = new BlobStoreIndexShardSnapshot(
                    snapshotId.getName(),
                    indexCommitPointFiles,
                    lastSnapshotStatus.getStartTime(),
                    threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
                    lastSnapshotStatus.getIncrementalFileCount(),
                    lastSnapshotStatus.getIncrementalSize()
                );
                // Once we start writing the shard level snapshot file, no cleanup will be performed because it is possible that
                // written files are referenced by another concurrent process.
                fileToCleanUp.set(List.of());
                try {
                    final String snapshotUUID = snapshotId.getUUID();
                    final Map<String, String> serializationParams = Collections.singletonMap(
                        BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
                        Boolean.toString(writeFileInfoWriterUUID)
                    );
                    snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file");
                    INDEX_SHARD_SNAPSHOT_FORMAT.write(
                        blobStoreIndexShardSnapshot,
                        shardContainer,
                        snapshotUUID,
                        compress,
                        serializationParams
                    );
                } catch (IOException e) {
                    throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
                }
                afterWriteSnapBlob.run();
                final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(
                    indexGeneration,
                    ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
                    getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
                );
                snapshotStatus.updateStatusDescription("all files uploaded: done");
                snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
                context.onResponse(shardSnapshotResult);
            }, e -> {
                try {
                    snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e);
                    shardContainer.deleteBlobsIgnoringIfNotExists(
                        OperationPurpose.SNAPSHOT_DATA,
                        Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
                    );
                } catch (Exception innerException) {
                    e.addSuppressed(innerException);
                }
                context.onFailure(e);
            }));

            if (indexIncrementalFileCount == 0 || filesToSnapshot.isEmpty()) {
                allFilesUploadedListener.onResponse(Collections.emptyList());
                return;
            }
            snapshotFiles(context, filesToSnapshot, allFilesUploadedListener);
        } catch (Exception e) {
            context.onFailure(e);
        }
    }