private void markInstantsAsCleaned()

in xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java [440:562]


    private void markInstantsAsCleaned(
        HoodieJavaTable<?> table,
        HoodieWriteConfig writeConfig,
        HoodieEngineContext engineContext) {
      CleanPlanner<?, ?, ?, ?> planner = new CleanPlanner<>(engineContext, table, writeConfig);
      Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
      // since we're retaining based on time, we should exit early if earliestInstant is empty
      if (!earliestInstant.isPresent()) {
        return;
      }
      List<String> partitionsToClean;
      try {
        partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
      } catch (IOException ex) {
        throw new ReadException("Unable to get partitions to clean", ex);
      }
      if (partitionsToClean.isEmpty()) {
        return;
      }

      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
      TableFileSystemView fsView = table.getHoodieView();
      Map<String, List<HoodieCleanFileInfo>> cleanInfoPerPartition =
          partitionsToClean.stream()
              .map(
                  partition ->
                      Pair.of(partition, planner.getDeletePaths(partition, earliestInstant)))
              .filter(deletePaths -> !deletePaths.getValue().getValue().isEmpty())
              .collect(
                  Collectors.toMap(
                      Pair::getKey,
                      deletePathsForPartition -> {
                        String partition = deletePathsForPartition.getKey();
                        // we need to manipulate the path to properly clean from the metadata table,
                        // so we map the file path to the base file
                        Map<String, HoodieBaseFile> baseFilesByPath =
                            fsView
                                .getAllReplacedFileGroups(partition)
                                .flatMap(HoodieFileGroup::getAllBaseFiles)
                                .collect(
                                    Collectors.toMap(HoodieBaseFile::getPath, Function.identity()));
                        return deletePathsForPartition.getValue().getValue().stream()
                            .map(
                                cleanFileInfo -> {
                                  HoodieBaseFile baseFile =
                                      baseFilesByPath.get(cleanFileInfo.getFilePath());
                                  return new HoodieCleanFileInfo(
                                      ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(
                                          baseFile.getFileName(), baseFile.getCommitTime()),
                                      false);
                                })
                            .collect(Collectors.toList());
                      }));
      // there is nothing to clean, so exit early
      if (cleanInfoPerPartition.isEmpty()) {
        return;
      }
      // create a clean instant write after this latest commit
      String cleanTime =
          HudiInstantUtils.convertInstantToCommit(
              HudiInstantUtils.parseFromInstantTime(instantTime).plus(1, ChronoUnit.SECONDS));
      // create a metadata table writer in order to mark files as deleted in the table
      // the deleted entries are cleaned up in the metadata table during compaction to control the
      // growth of the table
      try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
          table.getMetadataWriter(cleanTime).get()) {
        HoodieCleanerPlan cleanerPlan =
            new HoodieCleanerPlan(
                earliestInstant
                    .map(
                        earliestInstantToRetain ->
                            new HoodieActionInstant(
                                earliestInstantToRetain.getTimestamp(),
                                earliestInstantToRetain.getAction(),
                                earliestInstantToRetain.getState().name()))
                    .orElse(null),
                instantTime,
                writeConfig.getCleanerPolicy().name(),
                Collections.emptyMap(),
                CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
                cleanInfoPerPartition,
                Collections.emptyList());
        // create a clean instant and mark it as requested with the clean plan
        HoodieInstant requestedCleanInstant =
            new HoodieInstant(
                HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime);
        activeTimeline.saveToCleanRequested(
            requestedCleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
        HoodieInstant inflightClean =
            activeTimeline.transitionCleanRequestedToInflight(
                requestedCleanInstant, Option.empty());
        List<HoodieCleanStat> cleanStats =
            cleanInfoPerPartition.entrySet().stream()
                .map(
                    entry -> {
                      String partitionPath = entry.getKey();
                      List<String> deletePaths =
                          entry.getValue().stream()
                              .map(HoodieCleanFileInfo::getFilePath)
                              .collect(Collectors.toList());
                      return new HoodieCleanStat(
                          HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
                          partitionPath,
                          deletePaths,
                          deletePaths,
                          Collections.emptyList(),
                          earliestInstant.get().getTimestamp(),
                          instantTime);
                    })
                .collect(Collectors.toList());
        HoodieCleanMetadata cleanMetadata =
            CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats);
        // update the metadata table with the clean metadata so the files' metadata are marked for
        // deletion
        hoodieTableMetadataWriter.performTableServices(Option.empty());
        hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
        // mark the commit as complete on the table timeline
        activeTimeline.transitionCleanInflightToComplete(
            inflightClean, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
      } catch (Exception ex) {
        throw new UpdateException("Unable to clean Hudi timeline", ex);
      }
    }