public void cleanFiles()

in core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java [52:272]


  public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) {
    if (afterExpiration.refs().size() > 1) {
      throw new UnsupportedOperationException(
          "Cannot incrementally clean files for tables with more than 1 ref");
    }

    // clean up the expired snapshots:
    // 1. Get a list of the snapshots that were removed
    // 2. Delete any data files that were deleted by those snapshots and are not in the table
    // 3. Delete any manifests that are no longer used by current snapshots
    // 4. Delete the manifest lists

    Set<Long> validIds = Sets.newHashSet();
    for (Snapshot snapshot : afterExpiration.snapshots()) {
      validIds.add(snapshot.snapshotId());
    }

    Set<Long> expiredIds = Sets.newHashSet();
    for (Snapshot snapshot : beforeExpiration.snapshots()) {
      long snapshotId = snapshot.snapshotId();
      if (!validIds.contains(snapshotId)) {
        // the snapshot was expired
        LOG.info("Expired snapshot: {}", snapshot);
        expiredIds.add(snapshotId);
      }
    }

    if (expiredIds.isEmpty()) {
      // if no snapshots were expired, skip cleanup
      return;
    }

    SnapshotRef branchToCleanup = Iterables.getFirst(beforeExpiration.refs().values(), null);
    if (branchToCleanup == null) {
      return;
    }

    Snapshot latest = beforeExpiration.snapshot(branchToCleanup.snapshotId());
    List<Snapshot> snapshots = afterExpiration.snapshots();

    // this is the set of ancestors of the current table state. when removing snapshots, this must
    // only remove files that were deleted in an ancestor of the current table state to avoid
    // physically deleting files that were logically deleted in a commit that was rolled back.
    Set<Long> ancestorIds =
        Sets.newHashSet(SnapshotUtil.ancestorIds(latest, beforeExpiration::snapshot));

    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
    for (long snapshotId : ancestorIds) {
      String sourceSnapshotId =
          beforeExpiration
              .snapshot(snapshotId)
              .summary()
              .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
      if (sourceSnapshotId != null) {
        // protect any snapshot that was cherry-picked into the current table state
        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
      }
    }

    // find manifests to clean up that are still referenced by a valid snapshot, but written by an
    // expired snapshot
    Set<String> validManifests = Sets.newHashSet();
    Set<ManifestFile> manifestsToScan = Sets.newHashSet();

    // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete
    // as much of the delete work as possible and avoid orphaned data or manifest files.
    Tasks.foreach(snapshots)
        .retry(3)
        .suppressFailureWhenFinished()
        .onFailure(
            (snapshot, exc) ->
                LOG.warn(
                    "Failed on snapshot {} while reading manifest list: {}",
                    snapshot.snapshotId(),
                    snapshot.manifestListLocation(),
                    exc))
        .run(
            snapshot -> {
              try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) {
                for (ManifestFile manifest : manifests) {
                  validManifests.add(manifest.path());

                  long snapshotId = manifest.snapshotId();
                  // whether the manifest was created by a valid snapshot (true) or an expired
                  // snapshot (false)
                  boolean fromValidSnapshots = validIds.contains(snapshotId);
                  // whether the snapshot that created the manifest was an ancestor of the table
                  // state
                  boolean isFromAncestor = ancestorIds.contains(snapshotId);
                  // whether the changes in this snapshot have been picked into the current table
                  // state
                  boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
                  // if the snapshot that wrote this manifest is no longer valid (has expired),
                  // then delete its deleted files. note that this is only for expired snapshots
                  // that are in the
                  // current table state
                  if (!fromValidSnapshots
                      && (isFromAncestor || isPicked)
                      && manifest.hasDeletedFiles()) {
                    manifestsToScan.add(manifest.copy());
                  }
                }

              } catch (IOException e) {
                throw new RuntimeIOException(
                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
              }
            });

    // find manifests to clean up that were only referenced by snapshots that have expired
    Set<String> manifestListsToDelete = Sets.newHashSet();
    Set<String> manifestsToDelete = Sets.newHashSet();
    Set<ManifestFile> manifestsToRevert = Sets.newHashSet();
    Tasks.foreach(beforeExpiration.snapshots())
        .retry(3)
        .suppressFailureWhenFinished()
        .onFailure(
            (snapshot, exc) ->
                LOG.warn(
                    "Failed on snapshot {} while reading manifest list: {}",
                    snapshot.snapshotId(),
                    snapshot.manifestListLocation(),
                    exc))
        .run(
            snapshot -> {
              long snapshotId = snapshot.snapshotId();
              if (!validIds.contains(snapshotId)) {
                // determine whether the changes in this snapshot are in the current table state
                if (pickedAncestorSnapshotIds.contains(snapshotId)) {
                  // this snapshot was cherry-picked into the current table state, so skip cleaning
                  // it up.
                  // its changes will expire when the picked snapshot expires.
                  // A -- C -- D (source=B)
                  //  `- B <-- this commit
                  return;
                }

                long sourceSnapshotId =
                    PropertyUtil.propertyAsLong(
                        snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
                if (ancestorIds.contains(sourceSnapshotId)) {
                  // this commit was cherry-picked from a commit that is in the current table state.
                  // do not clean up its changes because it would revert data file additions that
                  // are in the current
                  // table.
                  // A -- B -- C
                  //  `- D (source=B) <-- this commit
                  return;
                }

                if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
                  // this commit was cherry-picked from a commit that is in the current table state.
                  // do not clean up its changes because it would revert data file additions that
                  // are in the current
                  // table.
                  // A -- C -- E (source=B)
                  //  `- B `- D (source=B) <-- this commit
                  return;
                }

                // find any manifests that are no longer needed
                try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) {
                  for (ManifestFile manifest : manifests) {
                    if (!validManifests.contains(manifest.path())) {
                      manifestsToDelete.add(manifest.path());

                      boolean isFromAncestor = ancestorIds.contains(manifest.snapshotId());
                      boolean isFromExpiringSnapshot = expiredIds.contains(manifest.snapshotId());

                      if (isFromAncestor && manifest.hasDeletedFiles()) {
                        // Only delete data files that were deleted in by an expired snapshot if
                        // that napshot is an ancestor of the current table state. Otherwise, a
                        // snapshot
                        // that deleted files and was rolled back will delete files that could be in
                        // the current
                        // table state.
                        manifestsToScan.add(manifest.copy());
                      }

                      if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) {
                        // Because the manifest was written by a snapshot that is not an ancestor of
                        // the current table state, the files added in this manifest can be removed.
                        // The
                        // extra check whether the manifest was written by a known snapshot that was
                        // expired in this commit ensures that the full ancestor list between when
                        // the snapshot
                        // was written and this expiration is known and there is no missing history.
                        // If
                        // history were missing, then the snapshot could be an ancestor of the table
                        // state
                        // but the ancestor ID set would not contain it and this would be unsafe.
                        manifestsToRevert.add(manifest.copy());
                      }
                    }
                  }
                } catch (IOException e) {
                  throw new RuntimeIOException(
                      e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
                }

                // add the manifest list to the delete set, if present
                if (snapshot.manifestListLocation() != null) {
                  manifestListsToDelete.add(snapshot.manifestListLocation());
                }
              }
            });

    Set<String> filesToDelete =
        findFilesToDelete(
            manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());

    deleteFiles(filesToDelete, "data");
    deleteFiles(manifestsToDelete, "manifest");
    deleteFiles(manifestListsToDelete, "manifest list");

    if (hasAnyStatisticsFiles(beforeExpiration)) {
      Set<String> expiredStatisticsFilesLocations =
          expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
      deleteFiles(expiredStatisticsFilesLocations, "statistics files");
    }
  }