in paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java [137:273]
public int expireUntil(long earliestId, long endExclusiveId) {
if (endExclusiveId <= earliestId) {
// No expire happens:
// write the hint file in order to see the earliest snapshot directly next time
// should avoid duplicate writes when the file exists
if (snapshotManager.earliestFileNotExists()) {
writeEarliestHint(earliestId);
}
// fast exit
return 0;
}
// find first snapshot to expire
long beginInclusiveId = earliestId;
for (long id = endExclusiveId - 1; id >= earliestId; id--) {
if (!snapshotManager.snapshotExists(id)) {
// only latest snapshots are retained, as we cannot find this snapshot, we can
// assume that all snapshots preceding it have been removed
beginInclusiveId = id + 1;
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete merge tree files not used by snapshot #" + id);
}
Snapshot snapshot;
try {
snapshot = snapshotManager.tryGetSnapshot(id);
} catch (FileNotFoundException e) {
beginInclusiveId = id + 1;
continue;
}
// expire merge tree files and collect changed buckets
Predicate<ExpireFileEntry> skipper;
try {
skipper = snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
"Skip cleaning data files of snapshot '%s' due to failed to build skipping set.",
id),
e);
continue;
}
snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
}
// delete changelog files
if (!expireConfig.isChangelogDecoupled()) {
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete changelog files from snapshot #" + id);
}
Snapshot snapshot;
try {
snapshot = snapshotManager.tryGetSnapshot(id);
} catch (FileNotFoundException e) {
beginInclusiveId = id + 1;
continue;
}
if (snapshot.changelogManifestList() != null) {
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}
}
}
// data files and changelog files in bucket directories has been deleted
// then delete changed bucket directories if they are empty
snapshotDeletion.cleanEmptyDirectories();
// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
findSkippingTags(taggedSnapshots, beginInclusiveId, endExclusiveId);
try {
skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId));
} catch (FileNotFoundException e) {
// the end exclusive snapshot is gone
// there is no need to proceed
return 0;
}
Set<String> skippingSet = null;
try {
skippingSet = new HashSet<>(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
} catch (Exception e) {
LOG.info("Skip cleaning manifest files due to failed to build skipping set.", e);
}
if (skippingSet != null) {
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
}
Snapshot snapshot;
try {
snapshot = snapshotManager.tryGetSnapshot(id);
} catch (FileNotFoundException e) {
beginInclusiveId = id + 1;
continue;
}
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
}
}
// delete snapshot file finally
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
Snapshot snapshot;
try {
snapshot = snapshotManager.tryGetSnapshot(id);
} catch (FileNotFoundException e) {
beginInclusiveId = id + 1;
continue;
}
if (expireConfig.isChangelogDecoupled()) {
commitChangelog(new Changelog(snapshot));
}
snapshotManager.deleteSnapshot(id);
}
writeEarliestHint(endExclusiveId);
return (int) (endExclusiveId - beginInclusiveId);
}