private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo()

in xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java [145:254]


  private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo(
      HoodieTimeline timeline,
      HoodieInstant instant,
      TableFileSystemView fsView,
      HoodieInstant instantToConsider,
      List<InternalPartitionField> partitioningFields) {
    try {
      List<InternalDataFile> addedFiles = new ArrayList<>();
      List<InternalDataFile> removedFiles = new ArrayList<>();
      switch (instant.getAction()) {
        case HoodieTimeline.COMMIT_ACTION:
        case HoodieTimeline.DELTA_COMMIT_ACTION:
          HoodieCommitMetadata commitMetadata =
              HoodieCommitMetadata.fromBytes(
                  timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
          commitMetadata
              .getPartitionToWriteStats()
              .forEach(
                  (partitionPath, writeStats) -> {
                    Set<String> affectedFileIds =
                        writeStats.stream()
                            .map(HoodieWriteStat::getFileId)
                            .collect(Collectors.toSet());
                    AddedAndRemovedFiles addedAndRemovedFiles =
                        getUpdatesToPartition(
                            fsView,
                            instantToConsider,
                            partitionPath,
                            affectedFileIds,
                            partitioningFields);
                    addedFiles.addAll(addedAndRemovedFiles.getAdded());
                    removedFiles.addAll(addedAndRemovedFiles.getRemoved());
                  });
          break;
        case HoodieTimeline.REPLACE_COMMIT_ACTION:
          HoodieReplaceCommitMetadata replaceMetadata =
              HoodieReplaceCommitMetadata.fromBytes(
                  timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);

          replaceMetadata
              .getPartitionToReplaceFileIds()
              .forEach(
                  (partitionPath, fileIds) -> {
                    Set<String> replacedFileIdsByPartition = new HashSet<>(fileIds);
                    Set<String> newFileIds =
                        replaceMetadata
                            .getPartitionToWriteStats()
                            .getOrDefault(partitionPath, Collections.emptyList())
                            .stream()
                            .map(HoodieWriteStat::getFileId)
                            .collect(Collectors.toSet());
                    AddedAndRemovedFiles addedAndRemovedFiles =
                        getUpdatesToPartitionForReplaceCommit(
                            fsView,
                            instantToConsider,
                            partitionPath,
                            replacedFileIdsByPartition,
                            newFileIds,
                            partitioningFields);
                    addedFiles.addAll(addedAndRemovedFiles.getAdded());
                    removedFiles.addAll(addedAndRemovedFiles.getRemoved());
                  });
          break;
        case HoodieTimeline.ROLLBACK_ACTION:
          HoodieRollbackMetadata rollbackMetadata =
              TimelineMetadataUtils.deserializeAvroMetadata(
                  timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
          rollbackMetadata
              .getPartitionMetadata()
              .forEach(
                  (partition, metadata) ->
                      removedFiles.addAll(
                          getRemovedFiles(
                              partition, metadata.getSuccessDeleteFiles(), partitioningFields)));
          break;
        case HoodieTimeline.RESTORE_ACTION:
          HoodieRestoreMetadata restoreMetadata =
              TimelineMetadataUtils.deserializeAvroMetadata(
                  timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
          restoreMetadata
              .getHoodieRestoreMetadata()
              .forEach(
                  (key, rollbackMetadataList) ->
                      rollbackMetadataList.forEach(
                          rollbackMeta ->
                              rollbackMeta
                                  .getPartitionMetadata()
                                  .forEach(
                                      (partition, metadata) ->
                                          removedFiles.addAll(
                                              getRemovedFiles(
                                                  partition,
                                                  metadata.getSuccessDeleteFiles(),
                                                  partitioningFields)))));
          break;
        case HoodieTimeline.CLEAN_ACTION:
        case HoodieTimeline.SAVEPOINT_ACTION:
        case HoodieTimeline.LOG_COMPACTION_ACTION:
        case HoodieTimeline.INDEXING_ACTION:
        case HoodieTimeline.SCHEMA_COMMIT_ACTION:
          // these do not impact the base files
          break;
        default:
          throw new NotSupportedException("Unexpected commit type " + instant.getAction());
      }
      return AddedAndRemovedFiles.builder().added(addedFiles).removed(removedFiles).build();
    } catch (IOException ex) {
      throw new ReadException("Unable to read commit metadata for commit " + instant, ex);
    }
  }