in xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java [145:254]
private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo(
HoodieTimeline timeline,
HoodieInstant instant,
TableFileSystemView fsView,
HoodieInstant instantToConsider,
List<InternalPartitionField> partitioningFields) {
try {
List<InternalDataFile> addedFiles = new ArrayList<>();
List<InternalDataFile> removedFiles = new ArrayList<>();
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
commitMetadata
.getPartitionToWriteStats()
.forEach(
(partitionPath, writeStats) -> {
Set<String> affectedFileIds =
writeStats.stream()
.map(HoodieWriteStat::getFileId)
.collect(Collectors.toSet());
AddedAndRemovedFiles addedAndRemovedFiles =
getUpdatesToPartition(
fsView,
instantToConsider,
partitionPath,
affectedFileIds,
partitioningFields);
addedFiles.addAll(addedAndRemovedFiles.getAdded());
removedFiles.addAll(addedAndRemovedFiles.getRemoved());
});
break;
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
replaceMetadata
.getPartitionToReplaceFileIds()
.forEach(
(partitionPath, fileIds) -> {
Set<String> replacedFileIdsByPartition = new HashSet<>(fileIds);
Set<String> newFileIds =
replaceMetadata
.getPartitionToWriteStats()
.getOrDefault(partitionPath, Collections.emptyList())
.stream()
.map(HoodieWriteStat::getFileId)
.collect(Collectors.toSet());
AddedAndRemovedFiles addedAndRemovedFiles =
getUpdatesToPartitionForReplaceCommit(
fsView,
instantToConsider,
partitionPath,
replacedFileIdsByPartition,
newFileIds,
partitioningFields);
addedFiles.addAll(addedAndRemovedFiles.getAdded());
removedFiles.addAll(addedAndRemovedFiles.getRemoved());
});
break;
case HoodieTimeline.ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
rollbackMetadata
.getPartitionMetadata()
.forEach(
(partition, metadata) ->
removedFiles.addAll(
getRemovedFiles(
partition, metadata.getSuccessDeleteFiles(), partitioningFields)));
break;
case HoodieTimeline.RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
restoreMetadata
.getHoodieRestoreMetadata()
.forEach(
(key, rollbackMetadataList) ->
rollbackMetadataList.forEach(
rollbackMeta ->
rollbackMeta
.getPartitionMetadata()
.forEach(
(partition, metadata) ->
removedFiles.addAll(
getRemovedFiles(
partition,
metadata.getSuccessDeleteFiles(),
partitioningFields)))));
break;
case HoodieTimeline.CLEAN_ACTION:
case HoodieTimeline.SAVEPOINT_ACTION:
case HoodieTimeline.LOG_COMPACTION_ACTION:
case HoodieTimeline.INDEXING_ACTION:
case HoodieTimeline.SCHEMA_COMMIT_ACTION:
// these do not impact the base files
break;
default:
throw new NotSupportedException("Unexpected commit type " + instant.getAction());
}
return AddedAndRemovedFiles.builder().added(addedFiles).removed(removedFiles).build();
} catch (IOException ex) {
throw new ReadException("Unable to read commit metadata for commit " + instant, ex);
}
}