void cleanupExpiredRemoteLogSegments()

in storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java [1224:1397]


        void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
            if (isCancelled()) {
                logger.info("Returning from remote log segments cleanup as the task state is changed");
                return;
            }

            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
            if (logOptional.isEmpty()) {
                logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition);
                return;
            }

            final UnifiedLog log = logOptional.get();

            // Cleanup remote log segments and update the log start offset if applicable.
            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition);
            if (!segmentMetadataIter.hasNext()) {
                updateMetadataCountAndLogSizeWith(0, 0);
                logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
                return;
            }

            final Set<Integer> epochsSet = new HashSet<>();
            int metadataCount = 0;
            long remoteLogSizeBytes = 0;
            // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
            // instead of going through all the segments and building it here.
            while (segmentMetadataIter.hasNext()) {
                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
                metadataCount++;
                remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
            }

            updateMetadataCountAndLogSizeWith(metadataCount, remoteLogSizeBytes);

            // All the leader epochs in sorted order that exists in remote storage
            final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
            Collections.sort(remoteLeaderEpochs);

            LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache();
            // Build the leader epoch map by filtering the epochs that do not have any records.
            NavigableMap<Integer, Long> epochWithOffsets = buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());

            long logStartOffset = log.logStartOffset();
            long logEndOffset = log.logEndOffset();
            Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize,
                    log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets);
            Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs);

            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
            Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
            boolean canProcess = true;
            List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>();
            long sizeOfDeletableSegmentsBytes = 0L;
            while (canProcess && epochIterator.hasNext()) {
                Integer epoch = epochIterator.next();
                Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch);
                while (canProcess && segmentsIterator.hasNext()) {
                    if (isCancelled()) {
                        logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed.");
                        return;
                    }
                    RemoteLogSegmentMetadata metadata = segmentsIterator.next();

                    if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
                        logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments",
                                metadata.remoteLogSegmentId());
                        canProcess = false;
                        continue;
                    }
                    // This works as retry mechanism for dangling remote segments that failed the deletion in previous attempts.
                    // Rather than waiting for the retention to kick in, we cleanup early to avoid polluting the cache and possibly waste remote storage.
                    if (RemoteLogSegmentState.DELETE_SEGMENT_STARTED.equals(metadata.state())) {
                        segmentsToDelete.add(metadata);
                        continue;
                    }
                    if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
                        continue;
                    }
                    if (segmentsToDelete.contains(metadata)) {
                        continue;
                    }
                    // When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
                    // as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
                    // remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
                    // the epochs present in the segment lies in the checkpoint file. It will always return false
                    // since the checkpoint file was already truncated.
                    boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
                            metadata, logStartOffset, epochWithOffsets);
                    boolean isValidSegment = false;
                    if (!shouldDeleteSegment) {
                        // check whether the segment contains the required epoch range with in the current leader epoch lineage.
                        isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
                        if (isValidSegment) {
                            shouldDeleteSegment =
                                    remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
                                            remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
                        }
                    }
                    if (shouldDeleteSegment) {
                        segmentsToDelete.add(metadata);
                        sizeOfDeletableSegmentsBytes += metadata.segmentSizeInBytes();
                    }
                    canProcess = shouldDeleteSegment || !isValidSegment;
                }
            }

            // Update log start offset with the computed value after retention cleanup is done
            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

            // At this point in time we have updated the log start offsets, but not initiated a deletion.
            // Either a follower has picked up the changes to the log start offset, or they have not.
            // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
            // the deletion.
            // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
            // and delete them accordingly.
            // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
            // again and delete them with the original deletion reason i.e. size, time or log start offset breach.
            int segmentsLeftToDelete = segmentsToDelete.size();
            updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
            List<String> undeletedSegments = new ArrayList<>();
            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
                if (!deleteRemoteLogSegment(segmentMetadata, ignored -> !isCancelled())) {
                    undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
                } else {
                    sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
                    segmentsLeftToDelete--;
                    updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
                }
            }
            if (!undeletedSegments.isEmpty()) {
                logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
            }

            // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
            // to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
            // unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
            // earliest leader epoch.
            Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry();
            if (earliestEpochEntryOptional.isPresent()) {
                EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get();
                Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream()
                        .filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch)
                        .iterator();

                List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned = new ArrayList<>();

                while (epochsToClean.hasNext()) {
                    int epoch = epochsToClean.next();
                    Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch);
                    while (segmentsToBeCleaned.hasNext()) {
                        if (!isCancelled()) {
                            RemoteLogSegmentMetadata nextSegmentMetadata = segmentsToBeCleaned.next();
                            sizeOfDeletableSegmentsBytes += nextSegmentMetadata.segmentSizeInBytes();
                            listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
                        }
                    }
                }

                segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
                updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
                for (RemoteLogSegmentMetadata segmentMetadata : listOfSegmentsToBeCleaned) {
                    if (!isCancelled()) {
                        // No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value.
                        if (remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentMetadata)) {
                            sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
                            segmentsLeftToDelete--;
                            updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
                        }
                    }
                }
            }
        }