in xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java [106:176]
public TableChange getTableChangeForCommit(Long versionNumber) {
InternalTable tableAtVersion = tableExtractor.table(deltaLog, tableName, versionNumber);
List<Action> actionsForVersion = getChangesState().getActionsForVersion(versionNumber);
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
// All 3 of the following data structures use data file's absolute path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();
for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
}
} else if (action instanceof RemoveFile) {
InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
DeltaPartitionExtractor.getInstance());
removedFiles.put(dataFile.getPhysicalPath(), dataFile);
}
}
// In Delta Lake if delete vector information is added for an existing data file, as a result of
// a delete operation, then a new RemoveFile action is added to the commit log to remove the old
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
removedFiles.remove(deletionVector);
} else {
log.warn(
"No Remove action found for the data file for which deletion vector is added {}. This is unexpected.",
deletionVector);
}
}
InternalFilesDiff internalFilesDiff =
InternalFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.filesDiff(internalFilesDiff)
.sourceIdentifier(getCommitIdentifier(versionNumber))
.build();
}