public TableChange getTableChangeForCommit()

in xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java [106:176]


  public TableChange getTableChangeForCommit(Long versionNumber) {
    InternalTable tableAtVersion = tableExtractor.table(deltaLog, tableName, versionNumber);
    List<Action> actionsForVersion = getChangesState().getActionsForVersion(versionNumber);
    Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
    FileFormat fileFormat =
        actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());

    // All 3 of the following data structures use data file's absolute path as the key
    Map<String, InternalDataFile> addedFiles = new HashMap<>();
    Map<String, InternalDataFile> removedFiles = new HashMap<>();
    // Set of data file paths for which deletion vectors exists.
    Set<String> deletionVectors = new HashSet<>();

    for (Action action : actionsForVersion) {
      if (action instanceof AddFile) {
        InternalDataFile dataFile =
            actionsConverter.convertAddActionToInternalDataFile(
                (AddFile) action,
                snapshotAtVersion,
                fileFormat,
                tableAtVersion.getPartitioningFields(),
                tableAtVersion.getReadSchema().getFields(),
                true,
                DeltaPartitionExtractor.getInstance(),
                DeltaStatsExtractor.getInstance());
        addedFiles.put(dataFile.getPhysicalPath(), dataFile);
        String deleteVectorPath =
            actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
        if (deleteVectorPath != null) {
          deletionVectors.add(deleteVectorPath);
        }
      } else if (action instanceof RemoveFile) {
        InternalDataFile dataFile =
            actionsConverter.convertRemoveActionToInternalDataFile(
                (RemoveFile) action,
                snapshotAtVersion,
                fileFormat,
                tableAtVersion.getPartitioningFields(),
                DeltaPartitionExtractor.getInstance());
        removedFiles.put(dataFile.getPhysicalPath(), dataFile);
      }
    }

    // In Delta Lake if delete vector information is added for an existing data file, as a result of
    // a delete operation, then a new RemoveFile action is added to the commit log to remove the old
    // entry which is replaced by a new entry, AddFile with delete vector information. Since the
    // same data file is removed and added, we need to remove it from the added and removed file
    // maps which are used to track actual added and removed data files.
    for (String deletionVector : deletionVectors) {
      // validate that a Remove action is also added for the data file
      if (removedFiles.containsKey(deletionVector)) {
        addedFiles.remove(deletionVector);
        removedFiles.remove(deletionVector);
      } else {
        log.warn(
            "No Remove action found for the data file for which deletion vector is added {}. This is unexpected.",
            deletionVector);
      }
    }

    InternalFilesDiff internalFilesDiff =
        InternalFilesDiff.builder()
            .filesAdded(addedFiles.values())
            .filesRemoved(removedFiles.values())
            .build();
    return TableChange.builder()
        .tableAsOfChange(tableAtVersion)
        .filesDiff(internalFilesDiff)
        .sourceIdentifier(getCommitIdentifier(versionNumber))
        .build();
  }