in storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java [1224:1397]
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
return;
}
final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
if (logOptional.isEmpty()) {
logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition);
return;
}
final UnifiedLog log = logOptional.get();
// Cleanup remote log segments and update the log start offset if applicable.
final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition);
if (!segmentMetadataIter.hasNext()) {
updateMetadataCountAndLogSizeWith(0, 0);
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
return;
}
final Set<Integer> epochsSet = new HashSet<>();
int metadataCount = 0;
long remoteLogSizeBytes = 0;
// Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
// instead of going through all the segments and building it here.
while (segmentMetadataIter.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
metadataCount++;
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
}
updateMetadataCountAndLogSizeWith(metadataCount, remoteLogSizeBytes);
// All the leader epochs in sorted order that exists in remote storage
final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
Collections.sort(remoteLeaderEpochs);
LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache();
// Build the leader epoch map by filtering the epochs that do not have any records.
NavigableMap<Integer, Long> epochWithOffsets = buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize,
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets);
Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs);
RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
boolean canProcess = true;
List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>();
long sizeOfDeletableSegmentsBytes = 0L;
while (canProcess && epochIterator.hasNext()) {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch);
while (canProcess && segmentsIterator.hasNext()) {
if (isCancelled()) {
logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed.");
return;
}
RemoteLogSegmentMetadata metadata = segmentsIterator.next();
if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments",
metadata.remoteLogSegmentId());
canProcess = false;
continue;
}
// This works as retry mechanism for dangling remote segments that failed the deletion in previous attempts.
// Rather than waiting for the retention to kick in, we cleanup early to avoid polluting the cache and possibly waste remote storage.
if (RemoteLogSegmentState.DELETE_SEGMENT_STARTED.equals(metadata.state())) {
segmentsToDelete.add(metadata);
continue;
}
if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
continue;
}
if (segmentsToDelete.contains(metadata)) {
continue;
}
// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
shouldDeleteSegment =
remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
}
}
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
sizeOfDeletableSegmentsBytes += metadata.segmentSizeInBytes();
}
canProcess = shouldDeleteSegment || !isValidSegment;
}
}
// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
int segmentsLeftToDelete = segmentsToDelete.size();
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!deleteRemoteLogSegment(segmentMetadata, ignored -> !isCancelled())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
} else {
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
segmentsLeftToDelete--;
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
// Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
// earliest leader epoch.
Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry();
if (earliestEpochEntryOptional.isPresent()) {
EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get();
Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream()
.filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch)
.iterator();
List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned = new ArrayList<>();
while (epochsToClean.hasNext()) {
int epoch = epochsToClean.next();
Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsToBeCleaned.hasNext()) {
if (!isCancelled()) {
RemoteLogSegmentMetadata nextSegmentMetadata = segmentsToBeCleaned.next();
sizeOfDeletableSegmentsBytes += nextSegmentMetadata.segmentSizeInBytes();
listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
}
}
}
segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
for (RemoteLogSegmentMetadata segmentMetadata : listOfSegmentsToBeCleaned) {
if (!isCancelled()) {
// No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value.
if (remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentMetadata)) {
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
segmentsLeftToDelete--;
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
}
}
}
}
}