ReplaceMetadata extractSnapshotChanges()

in xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java [77:163]


  ReplaceMetadata extractSnapshotChanges(
      List<PartitionFileGroup> partitionedDataFiles,
      HoodieTableMetaClient metaClient,
      String commit) {
    HoodieMetadataConfig metadataConfig =
        HoodieMetadataConfig.newBuilder()
            .enable(metaClient.getTableConfig().isMetadataTableAvailable())
            .build();
    HoodieTableFileSystemView fsView =
        new HoodieMetadataFileSystemView(
            engineContext, metaClient, metaClient.getActiveTimeline(), metadataConfig);
    boolean isTableInitialized = metaClient.isTimelineNonEmpty();
    // Track the partitions that are not present in the snapshot, so the files for those partitions
    // can be dropped
    Set<String> partitionPathsToDrop =
        new HashSet<>(
            FSUtils.getAllPartitionPaths(
                engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
    ReplaceMetadata replaceMetadata =
        partitionedDataFiles.stream()
            .map(
                partitionFileGroup -> {
                  List<InternalDataFile> dataFiles = partitionFileGroup.getDataFiles();
                  String partitionPath = getPartitionPath(tableBasePath, dataFiles);
                  // remove the partition from the set of partitions to drop since it is present in
                  // the snapshot
                  partitionPathsToDrop.remove(partitionPath);
                  // create a map of file path to the data file, any entries not in the hudi table
                  // will be added
                  Map<String, InternalDataFile> physicalPathToFile =
                      dataFiles.stream()
                          .collect(
                              Collectors.toMap(
                                  InternalDataFile::getPhysicalPath, Function.identity()));
                  List<HoodieBaseFile> baseFiles =
                      isTableInitialized
                          ? fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList())
                          : Collections.emptyList();
                  Set<String> existingPaths =
                      baseFiles.stream().map(HoodieBaseFile::getPath).collect(Collectors.toSet());
                  // Mark fileIds for removal if the file paths are no longer present in the
                  // snapshot
                  List<String> fileIdsToRemove =
                      baseFiles.stream()
                          .filter(baseFile -> !physicalPathToFile.containsKey(baseFile.getPath()))
                          .map(HoodieBaseFile::getFileId)
                          .collect(Collectors.toList());
                  // for any entries in the map that are not in the set of existing paths, create a
                  // write status to add them to the Hudi table
                  List<WriteStatus> writeStatuses =
                      physicalPathToFile.entrySet().stream()
                          .filter(entry -> !existingPaths.contains(entry.getKey()))
                          .map(Map.Entry::getValue)
                          .map(
                              snapshotFile ->
                                  toWriteStatus(
                                      tableBasePath,
                                      commit,
                                      snapshotFile,
                                      Optional.of(partitionPath)))
                          .collect(Collectors.toList());
                  return ReplaceMetadata.of(
                      fileIdsToRemove.isEmpty()
                          ? Collections.emptyMap()
                          : Collections.singletonMap(partitionPath, fileIdsToRemove),
                      writeStatuses);
                })
            .reduce(ReplaceMetadata::combine)
            .orElse(ReplaceMetadata.EMPTY);
    // treat any partitions not present in the snapshot as dropped
    Optional<ReplaceMetadata> droppedPartitions =
        partitionPathsToDrop.stream()
            .map(
                partition -> {
                  List<String> fileIdsToRemove =
                      fsView
                          .getLatestBaseFiles(partition)
                          .map(HoodieBaseFile::getFileId)
                          .collect(Collectors.toList());
                  return ReplaceMetadata.of(
                      Collections.singletonMap(partition, fileIdsToRemove),
                      Collections.emptyList());
                })
            .reduce(ReplaceMetadata::combine);
    fsView.close();
    return droppedPartitions.map(replaceMetadata::combine).orElse(replaceMetadata);
  }