public int expireUntil()

in paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java [137:273]


    public int expireUntil(long earliestId, long endExclusiveId) {
        if (endExclusiveId <= earliestId) {
            // No expire happens:
            // write the hint file in order to see the earliest snapshot directly next time
            // should avoid duplicate writes when the file exists
            if (snapshotManager.earliestFileNotExists()) {
                writeEarliestHint(earliestId);
            }

            // fast exit
            return 0;
        }

        // find first snapshot to expire
        long beginInclusiveId = earliestId;
        for (long id = endExclusiveId - 1; id >= earliestId; id--) {
            if (!snapshotManager.snapshotExists(id)) {
                // only latest snapshots are retained, as we cannot find this snapshot, we can
                // assume that all snapshots preceding it have been removed
                beginInclusiveId = id + 1;
                break;
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
        }

        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();

        // delete merge tree files
        // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
        // id should be (beginInclusiveId, endExclusiveId]
        for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ready to delete merge tree files not used by snapshot #" + id);
            }
            Snapshot snapshot;
            try {
                snapshot = snapshotManager.tryGetSnapshot(id);
            } catch (FileNotFoundException e) {
                beginInclusiveId = id + 1;
                continue;
            }
            // expire merge tree files and collect changed buckets
            Predicate<ExpireFileEntry> skipper;
            try {
                skipper = snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
            } catch (Exception e) {
                LOG.info(
                        String.format(
                                "Skip cleaning data files of snapshot '%s' due to failed to build skipping set.",
                                id),
                        e);
                continue;
            }

            snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
        }

        // delete changelog files
        if (!expireConfig.isChangelogDecoupled()) {
            for (long id = beginInclusiveId; id < endExclusiveId; id++) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ready to delete changelog files from snapshot #" + id);
                }
                Snapshot snapshot;
                try {
                    snapshot = snapshotManager.tryGetSnapshot(id);
                } catch (FileNotFoundException e) {
                    beginInclusiveId = id + 1;
                    continue;
                }
                if (snapshot.changelogManifestList() != null) {
                    snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
                }
            }
        }

        // data files and changelog files in bucket directories has been deleted
        // then delete changed bucket directories if they are empty
        snapshotDeletion.cleanEmptyDirectories();

        // delete manifests and indexFiles
        List<Snapshot> skippingSnapshots =
                findSkippingTags(taggedSnapshots, beginInclusiveId, endExclusiveId);

        try {
            skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId));
        } catch (FileNotFoundException e) {
            // the end exclusive snapshot is gone
            // there is no need to proceed
            return 0;
        }

        Set<String> skippingSet = null;
        try {
            skippingSet = new HashSet<>(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
        } catch (Exception e) {
            LOG.info("Skip cleaning manifest files due to failed to build skipping set.", e);
        }
        if (skippingSet != null) {
            for (long id = beginInclusiveId; id < endExclusiveId; id++) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ready to delete manifests in snapshot #" + id);
                }

                Snapshot snapshot;
                try {
                    snapshot = snapshotManager.tryGetSnapshot(id);
                } catch (FileNotFoundException e) {
                    beginInclusiveId = id + 1;
                    continue;
                }
                snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
            }
        }

        // delete snapshot file finally
        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
            Snapshot snapshot;
            try {
                snapshot = snapshotManager.tryGetSnapshot(id);
            } catch (FileNotFoundException e) {
                beginInclusiveId = id + 1;
                continue;
            }
            if (expireConfig.isChangelogDecoupled()) {
                commitChangelog(new Changelog(snapshot));
            }
            snapshotManager.deleteSnapshot(id);
        }

        writeEarliestHint(endExclusiveId);
        return (int) (endExclusiveId - beginInclusiveId);
    }