in xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java [77:163]
ReplaceMetadata extractSnapshotChanges(
List<PartitionFileGroup> partitionedDataFiles,
HoodieTableMetaClient metaClient,
String commit) {
HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder()
.enable(metaClient.getTableConfig().isMetadataTableAvailable())
.build();
HoodieTableFileSystemView fsView =
new HoodieMetadataFileSystemView(
engineContext, metaClient, metaClient.getActiveTimeline(), metadataConfig);
boolean isTableInitialized = metaClient.isTimelineNonEmpty();
// Track the partitions that are not present in the snapshot, so the files for those partitions
// can be dropped
Set<String> partitionPathsToDrop =
new HashSet<>(
FSUtils.getAllPartitionPaths(
engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
ReplaceMetadata replaceMetadata =
partitionedDataFiles.stream()
.map(
partitionFileGroup -> {
List<InternalDataFile> dataFiles = partitionFileGroup.getDataFiles();
String partitionPath = getPartitionPath(tableBasePath, dataFiles);
// remove the partition from the set of partitions to drop since it is present in
// the snapshot
partitionPathsToDrop.remove(partitionPath);
// create a map of file path to the data file, any entries not in the hudi table
// will be added
Map<String, InternalDataFile> physicalPathToFile =
dataFiles.stream()
.collect(
Collectors.toMap(
InternalDataFile::getPhysicalPath, Function.identity()));
List<HoodieBaseFile> baseFiles =
isTableInitialized
? fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList())
: Collections.emptyList();
Set<String> existingPaths =
baseFiles.stream().map(HoodieBaseFile::getPath).collect(Collectors.toSet());
// Mark fileIds for removal if the file paths are no longer present in the
// snapshot
List<String> fileIdsToRemove =
baseFiles.stream()
.filter(baseFile -> !physicalPathToFile.containsKey(baseFile.getPath()))
.map(HoodieBaseFile::getFileId)
.collect(Collectors.toList());
// for any entries in the map that are not in the set of existing paths, create a
// write status to add them to the Hudi table
List<WriteStatus> writeStatuses =
physicalPathToFile.entrySet().stream()
.filter(entry -> !existingPaths.contains(entry.getKey()))
.map(Map.Entry::getValue)
.map(
snapshotFile ->
toWriteStatus(
tableBasePath,
commit,
snapshotFile,
Optional.of(partitionPath)))
.collect(Collectors.toList());
return ReplaceMetadata.of(
fileIdsToRemove.isEmpty()
? Collections.emptyMap()
: Collections.singletonMap(partitionPath, fileIdsToRemove),
writeStatuses);
})
.reduce(ReplaceMetadata::combine)
.orElse(ReplaceMetadata.EMPTY);
// treat any partitions not present in the snapshot as dropped
Optional<ReplaceMetadata> droppedPartitions =
partitionPathsToDrop.stream()
.map(
partition -> {
List<String> fileIdsToRemove =
fsView
.getLatestBaseFiles(partition)
.map(HoodieBaseFile::getFileId)
.collect(Collectors.toList());
return ReplaceMetadata.of(
Collections.singletonMap(partition, fileIdsToRemove),
Collections.emptyList());
})
.reduce(ReplaceMetadata::combine);
fsView.close();
return droppedPartitions.map(replaceMetadata::combine).orElse(replaceMetadata);
}