in xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java [440:562]
private void markInstantsAsCleaned(
HoodieJavaTable<?> table,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext) {
CleanPlanner<?, ?, ?, ?> planner = new CleanPlanner<>(engineContext, table, writeConfig);
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
// since we're retaining based on time, we should exit early if earliestInstant is empty
if (!earliestInstant.isPresent()) {
return;
}
List<String> partitionsToClean;
try {
partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
} catch (IOException ex) {
throw new ReadException("Unable to get partitions to clean", ex);
}
if (partitionsToClean.isEmpty()) {
return;
}
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
TableFileSystemView fsView = table.getHoodieView();
Map<String, List<HoodieCleanFileInfo>> cleanInfoPerPartition =
partitionsToClean.stream()
.map(
partition ->
Pair.of(partition, planner.getDeletePaths(partition, earliestInstant)))
.filter(deletePaths -> !deletePaths.getValue().getValue().isEmpty())
.collect(
Collectors.toMap(
Pair::getKey,
deletePathsForPartition -> {
String partition = deletePathsForPartition.getKey();
// we need to manipulate the path to properly clean from the metadata table,
// so we map the file path to the base file
Map<String, HoodieBaseFile> baseFilesByPath =
fsView
.getAllReplacedFileGroups(partition)
.flatMap(HoodieFileGroup::getAllBaseFiles)
.collect(
Collectors.toMap(HoodieBaseFile::getPath, Function.identity()));
return deletePathsForPartition.getValue().getValue().stream()
.map(
cleanFileInfo -> {
HoodieBaseFile baseFile =
baseFilesByPath.get(cleanFileInfo.getFilePath());
return new HoodieCleanFileInfo(
ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(
baseFile.getFileName(), baseFile.getCommitTime()),
false);
})
.collect(Collectors.toList());
}));
// there is nothing to clean, so exit early
if (cleanInfoPerPartition.isEmpty()) {
return;
}
// create a clean instant write after this latest commit
String cleanTime =
HudiInstantUtils.convertInstantToCommit(
HudiInstantUtils.parseFromInstantTime(instantTime).plus(1, ChronoUnit.SECONDS));
// create a metadata table writer in order to mark files as deleted in the table
// the deleted entries are cleaned up in the metadata table during compaction to control the
// growth of the table
try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
table.getMetadataWriter(cleanTime).get()) {
HoodieCleanerPlan cleanerPlan =
new HoodieCleanerPlan(
earliestInstant
.map(
earliestInstantToRetain ->
new HoodieActionInstant(
earliestInstantToRetain.getTimestamp(),
earliestInstantToRetain.getAction(),
earliestInstantToRetain.getState().name()))
.orElse(null),
instantTime,
writeConfig.getCleanerPolicy().name(),
Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
cleanInfoPerPartition,
Collections.emptyList());
// create a clean instant and mark it as requested with the clean plan
HoodieInstant requestedCleanInstant =
new HoodieInstant(
HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime);
activeTimeline.saveToCleanRequested(
requestedCleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
HoodieInstant inflightClean =
activeTimeline.transitionCleanRequestedToInflight(
requestedCleanInstant, Option.empty());
List<HoodieCleanStat> cleanStats =
cleanInfoPerPartition.entrySet().stream()
.map(
entry -> {
String partitionPath = entry.getKey();
List<String> deletePaths =
entry.getValue().stream()
.map(HoodieCleanFileInfo::getFilePath)
.collect(Collectors.toList());
return new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
partitionPath,
deletePaths,
deletePaths,
Collections.emptyList(),
earliestInstant.get().getTimestamp(),
instantTime);
})
.collect(Collectors.toList());
HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats);
// update the metadata table with the clean metadata so the files' metadata are marked for
// deletion
hoodieTableMetadataWriter.performTableServices(Option.empty());
hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
// mark the commit as complete on the table timeline
activeTimeline.transitionCleanInflightToComplete(
inflightClean, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
} catch (Exception ex) {
throw new UpdateException("Unable to clean Hudi timeline", ex);
}
}