spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java [278:673]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Dataset<FileURI> validFileIdentDS() {
    // transform before union to avoid extra serialization/deserialization
    FileInfoToFileURI toFileURI = new FileInfoToFileURI(equalSchemes, equalAuthorities);

    Dataset<FileURI> contentFileIdentDS = toFileURI.apply(contentFileDS(table));
    Dataset<FileURI> manifestFileIdentDS = toFileURI.apply(manifestDS(table));
    Dataset<FileURI> manifestListIdentDS = toFileURI.apply(manifestListDS(table));
    Dataset<FileURI> otherMetadataFileIdentDS = toFileURI.apply(otherMetadataFileDS(table));

    return contentFileIdentDS
        .union(manifestFileIdentDS)
        .union(manifestListIdentDS)
        .union(otherMetadataFileIdentDS);
  }

  private Dataset<FileURI> actualFileIdentDS() {
    StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
    if (compareToFileList == null) {
      return toFileURI.apply(listedFileDS());
    } else {
      return toFileURI.apply(filteredCompareToFileList());
    }
  }

  private Dataset<String> listedFileDS() {
    List<String> subDirs = Lists.newArrayList();
    List<String> matchingFiles = Lists.newArrayList();

    Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
    PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());

    // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
    // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
    listDirRecursively(
        location,
        predicate,
        hadoopConf.value(),
        MAX_DRIVER_LISTING_DEPTH,
        MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
        subDirs,
        pathFilter,
        matchingFiles);

    JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

    if (subDirs.isEmpty()) {
      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
    }

    int parallelism = Math.min(subDirs.size(), listingParallelism);
    JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);

    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
    ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
    JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);

    JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
  }

  private static void listDirRecursively(
      String dir,
      Predicate<FileStatus> predicate,
      Configuration conf,
      int maxDepth,
      int maxDirectSubDirs,
      List<String> remainingSubDirs,
      PathFilter pathFilter,
      List<String> matchingFiles) {

    // stop listing whenever we reach the max depth
    if (maxDepth <= 0) {
      remainingSubDirs.add(dir);
      return;
    }

    try {
      Path path = new Path(dir);
      FileSystem fs = path.getFileSystem(conf);

      List<String> subDirs = Lists.newArrayList();

      for (FileStatus file : fs.listStatus(path, pathFilter)) {
        if (file.isDirectory()) {
          subDirs.add(file.getPath().toString());
        } else if (file.isFile() && predicate.test(file)) {
          matchingFiles.add(file.getPath().toString());
        }
      }

      // stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs
      if (subDirs.size() > maxDirectSubDirs) {
        remainingSubDirs.addAll(subDirs);
        return;
      }

      for (String subDir : subDirs) {
        listDirRecursively(
            subDir,
            predicate,
            conf,
            maxDepth - 1,
            maxDirectSubDirs,
            remainingSubDirs,
            pathFilter,
            matchingFiles);
      }
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

  @VisibleForTesting
  static List<String> findOrphanFiles(
      SparkSession spark,
      Dataset<FileURI> actualFileIdentDS,
      Dataset<FileURI> validFileIdentDS,
      PrefixMismatchMode prefixMismatchMode) {

    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
    spark.sparkContext().register(conflicts);

    Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));

    List<String> orphanFiles =
        actualFileIdentDS
            .joinWith(validFileIdentDS, joinCond, "leftouter")
            .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
            .collectAsList();

    if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
      throw new ValidationException(
          "Unable to determine whether certain files are orphan. "
              + "Metadata references files that match listed/provided files except for authority/scheme. "
              + "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
              + "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
              + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
              + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
              + "authorities/schemes are different. It will be impossible to recover deleted files. "
              + "Conflicting authorities/schemes: %s.",
          conflicts.value());
    }

    return orphanFiles;
  }

  private static Map<String, String> flattenMap(Map<String, String> map) {
    Map<String, String> flattenedMap = Maps.newHashMap();
    if (map != null) {
      for (String key : map.keySet()) {
        String value = map.get(key);
        for (String splitKey : COMMA_SPLITTER.split(key)) {
          flattenedMap.put(splitKey.trim(), value.trim());
        }
      }
    }
    return flattenedMap;
  }

  private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {

    private final Broadcast<SerializableConfiguration> hadoopConf;
    private final long olderThanTimestamp;
    private final PathFilter pathFilter;

    ListDirsRecursively(
        Broadcast<SerializableConfiguration> hadoopConf,
        long olderThanTimestamp,
        PathFilter pathFilter) {

      this.hadoopConf = hadoopConf;
      this.olderThanTimestamp = olderThanTimestamp;
      this.pathFilter = pathFilter;
    }

    @Override
    public Iterator<String> call(Iterator<String> dirs) throws Exception {
      List<String> subDirs = Lists.newArrayList();
      List<String> files = Lists.newArrayList();

      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;

      while (dirs.hasNext()) {
        listDirRecursively(
            dirs.next(),
            predicate,
            hadoopConf.value().value(),
            MAX_EXECUTOR_LISTING_DEPTH,
            MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS,
            subDirs,
            pathFilter,
            files);
      }

      if (!subDirs.isEmpty()) {
        throw new RuntimeException(
            "Could not list sub directories, reached maximum depth: " + MAX_EXECUTOR_LISTING_DEPTH);
      }

      return files.iterator();
    }
  }

  private static class FindOrphanFiles
      implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {

    private final PrefixMismatchMode mode;
    private final SetAccumulator<Pair<String, String>> conflicts;

    FindOrphanFiles(PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
      this.mode = mode;
      this.conflicts = conflicts;
    }

    @Override
    public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
      Iterator<String> orphanFiles = Iterators.transform(rows, this::toOrphanFile);
      return Iterators.filter(orphanFiles, Objects::nonNull);
    }

    private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
      FileURI actual = row._1;
      FileURI valid = row._2;

      if (valid == null) {
        return actual.uriAsString;
      }

      boolean schemeMatch = uriComponentMatch(valid.scheme, actual.scheme);
      boolean authorityMatch = uriComponentMatch(valid.authority, actual.authority);

      if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
        return actual.uriAsString;
      } else {
        if (!schemeMatch) {
          conflicts.add(Pair.of(valid.scheme, actual.scheme));
        }

        if (!authorityMatch) {
          conflicts.add(Pair.of(valid.authority, actual.authority));
        }

        return null;
      }
    }

    private boolean uriComponentMatch(String valid, String actual) {
      return Strings.isNullOrEmpty(valid) || valid.equalsIgnoreCase(actual);
    }
  }

  @VisibleForTesting
  static class StringToFileURI extends ToFileURI<String> {
    StringToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      super(equalSchemes, equalAuthorities);
    }

    @Override
    protected String uriAsString(String input) {
      return input;
    }
  }

  @VisibleForTesting
  static class FileInfoToFileURI extends ToFileURI<FileInfo> {
    FileInfoToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      super(equalSchemes, equalAuthorities);
    }

    @Override
    protected String uriAsString(FileInfo fileInfo) {
      return fileInfo.getPath();
    }
  }

  private abstract static class ToFileURI<I> implements MapPartitionsFunction<I, FileURI> {

    private final Map<String, String> equalSchemes;
    private final Map<String, String> equalAuthorities;

    ToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      this.equalSchemes = equalSchemes;
      this.equalAuthorities = equalAuthorities;
    }

    protected abstract String uriAsString(I input);

    Dataset<FileURI> apply(Dataset<I> ds) {
      return ds.mapPartitions(this, FileURI.ENCODER);
    }

    @Override
    public Iterator<FileURI> call(Iterator<I> rows) throws Exception {
      return Iterators.transform(rows, this::toFileURI);
    }

    private FileURI toFileURI(I input) {
      String uriAsString = uriAsString(input);
      URI uri = new Path(uriAsString).toUri();
      String scheme = equalSchemes.getOrDefault(uri.getScheme(), uri.getScheme());
      String authority = equalAuthorities.getOrDefault(uri.getAuthority(), uri.getAuthority());
      return new FileURI(scheme, authority, uri.getPath(), uriAsString);
    }
  }

  /**
   * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be
   * marked as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of
   * the characters that indicate a hidden path.
   */
  @VisibleForTesting
  static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

    private final Set<String> hiddenPathPartitionNames;

    PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
      this.hiddenPathPartitionNames = hiddenPathPartitionNames;
    }

    @Override
    public boolean accept(Path path) {
      return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
    }

    private boolean isHiddenPartitionPath(Path path) {
      return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
    }

    static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
      if (specs == null) {
        return HiddenPathFilter.get();
      }

      Set<String> partitionNames =
          specs.values().stream()
              .map(PartitionSpec::fields)
              .flatMap(List::stream)
              .filter(field -> field.name().startsWith("_") || field.name().startsWith("."))
              .map(field -> field.name() + "=")
              .collect(Collectors.toSet());

      if (partitionNames.isEmpty()) {
        return HiddenPathFilter.get();
      } else {
        return new PartitionAwareHiddenPathFilter(partitionNames);
      }
    }
  }

  public static class FileURI {
    public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);

    private String scheme;
    private String authority;
    private String path;
    private String uriAsString;

    public FileURI(String scheme, String authority, String path, String uriAsString) {
      this.scheme = scheme;
      this.authority = authority;
      this.path = path;
      this.uriAsString = uriAsString;
    }

    public FileURI() {}

    public void setScheme(String scheme) {
      this.scheme = scheme;
    }

    public void setAuthority(String authority) {
      this.authority = authority;
    }

    public void setPath(String path) {
      this.path = path;
    }

    public void setUriAsString(String uriAsString) {
      this.uriAsString = uriAsString;
    }

    public String getScheme() {
      return scheme;
    }

    public String getAuthority() {
      return authority;
    }

    public String getPath() {
      return path;
    }

    public String getUriAsString() {
      return uriAsString;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java [278:673]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Dataset<FileURI> validFileIdentDS() {
    // transform before union to avoid extra serialization/deserialization
    FileInfoToFileURI toFileURI = new FileInfoToFileURI(equalSchemes, equalAuthorities);

    Dataset<FileURI> contentFileIdentDS = toFileURI.apply(contentFileDS(table));
    Dataset<FileURI> manifestFileIdentDS = toFileURI.apply(manifestDS(table));
    Dataset<FileURI> manifestListIdentDS = toFileURI.apply(manifestListDS(table));
    Dataset<FileURI> otherMetadataFileIdentDS = toFileURI.apply(otherMetadataFileDS(table));

    return contentFileIdentDS
        .union(manifestFileIdentDS)
        .union(manifestListIdentDS)
        .union(otherMetadataFileIdentDS);
  }

  private Dataset<FileURI> actualFileIdentDS() {
    StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
    if (compareToFileList == null) {
      return toFileURI.apply(listedFileDS());
    } else {
      return toFileURI.apply(filteredCompareToFileList());
    }
  }

  private Dataset<String> listedFileDS() {
    List<String> subDirs = Lists.newArrayList();
    List<String> matchingFiles = Lists.newArrayList();

    Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
    PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());

    // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
    // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
    listDirRecursively(
        location,
        predicate,
        hadoopConf.value(),
        MAX_DRIVER_LISTING_DEPTH,
        MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
        subDirs,
        pathFilter,
        matchingFiles);

    JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

    if (subDirs.isEmpty()) {
      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
    }

    int parallelism = Math.min(subDirs.size(), listingParallelism);
    JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);

    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
    ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
    JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);

    JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
  }

  private static void listDirRecursively(
      String dir,
      Predicate<FileStatus> predicate,
      Configuration conf,
      int maxDepth,
      int maxDirectSubDirs,
      List<String> remainingSubDirs,
      PathFilter pathFilter,
      List<String> matchingFiles) {

    // stop listing whenever we reach the max depth
    if (maxDepth <= 0) {
      remainingSubDirs.add(dir);
      return;
    }

    try {
      Path path = new Path(dir);
      FileSystem fs = path.getFileSystem(conf);

      List<String> subDirs = Lists.newArrayList();

      for (FileStatus file : fs.listStatus(path, pathFilter)) {
        if (file.isDirectory()) {
          subDirs.add(file.getPath().toString());
        } else if (file.isFile() && predicate.test(file)) {
          matchingFiles.add(file.getPath().toString());
        }
      }

      // stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs
      if (subDirs.size() > maxDirectSubDirs) {
        remainingSubDirs.addAll(subDirs);
        return;
      }

      for (String subDir : subDirs) {
        listDirRecursively(
            subDir,
            predicate,
            conf,
            maxDepth - 1,
            maxDirectSubDirs,
            remainingSubDirs,
            pathFilter,
            matchingFiles);
      }
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

  @VisibleForTesting
  static List<String> findOrphanFiles(
      SparkSession spark,
      Dataset<FileURI> actualFileIdentDS,
      Dataset<FileURI> validFileIdentDS,
      PrefixMismatchMode prefixMismatchMode) {

    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
    spark.sparkContext().register(conflicts);

    Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));

    List<String> orphanFiles =
        actualFileIdentDS
            .joinWith(validFileIdentDS, joinCond, "leftouter")
            .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
            .collectAsList();

    if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
      throw new ValidationException(
          "Unable to determine whether certain files are orphan. "
              + "Metadata references files that match listed/provided files except for authority/scheme. "
              + "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
              + "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
              + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
              + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
              + "authorities/schemes are different. It will be impossible to recover deleted files. "
              + "Conflicting authorities/schemes: %s.",
          conflicts.value());
    }

    return orphanFiles;
  }

  private static Map<String, String> flattenMap(Map<String, String> map) {
    Map<String, String> flattenedMap = Maps.newHashMap();
    if (map != null) {
      for (String key : map.keySet()) {
        String value = map.get(key);
        for (String splitKey : COMMA_SPLITTER.split(key)) {
          flattenedMap.put(splitKey.trim(), value.trim());
        }
      }
    }
    return flattenedMap;
  }

  private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {

    private final Broadcast<SerializableConfiguration> hadoopConf;
    private final long olderThanTimestamp;
    private final PathFilter pathFilter;

    ListDirsRecursively(
        Broadcast<SerializableConfiguration> hadoopConf,
        long olderThanTimestamp,
        PathFilter pathFilter) {

      this.hadoopConf = hadoopConf;
      this.olderThanTimestamp = olderThanTimestamp;
      this.pathFilter = pathFilter;
    }

    @Override
    public Iterator<String> call(Iterator<String> dirs) throws Exception {
      List<String> subDirs = Lists.newArrayList();
      List<String> files = Lists.newArrayList();

      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;

      while (dirs.hasNext()) {
        listDirRecursively(
            dirs.next(),
            predicate,
            hadoopConf.value().value(),
            MAX_EXECUTOR_LISTING_DEPTH,
            MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS,
            subDirs,
            pathFilter,
            files);
      }

      if (!subDirs.isEmpty()) {
        throw new RuntimeException(
            "Could not list sub directories, reached maximum depth: " + MAX_EXECUTOR_LISTING_DEPTH);
      }

      return files.iterator();
    }
  }

  private static class FindOrphanFiles
      implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {

    private final PrefixMismatchMode mode;
    private final SetAccumulator<Pair<String, String>> conflicts;

    FindOrphanFiles(PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
      this.mode = mode;
      this.conflicts = conflicts;
    }

    @Override
    public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
      Iterator<String> orphanFiles = Iterators.transform(rows, this::toOrphanFile);
      return Iterators.filter(orphanFiles, Objects::nonNull);
    }

    private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
      FileURI actual = row._1;
      FileURI valid = row._2;

      if (valid == null) {
        return actual.uriAsString;
      }

      boolean schemeMatch = uriComponentMatch(valid.scheme, actual.scheme);
      boolean authorityMatch = uriComponentMatch(valid.authority, actual.authority);

      if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
        return actual.uriAsString;
      } else {
        if (!schemeMatch) {
          conflicts.add(Pair.of(valid.scheme, actual.scheme));
        }

        if (!authorityMatch) {
          conflicts.add(Pair.of(valid.authority, actual.authority));
        }

        return null;
      }
    }

    private boolean uriComponentMatch(String valid, String actual) {
      return Strings.isNullOrEmpty(valid) || valid.equalsIgnoreCase(actual);
    }
  }

  @VisibleForTesting
  static class StringToFileURI extends ToFileURI<String> {
    StringToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      super(equalSchemes, equalAuthorities);
    }

    @Override
    protected String uriAsString(String input) {
      return input;
    }
  }

  @VisibleForTesting
  static class FileInfoToFileURI extends ToFileURI<FileInfo> {
    FileInfoToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      super(equalSchemes, equalAuthorities);
    }

    @Override
    protected String uriAsString(FileInfo fileInfo) {
      return fileInfo.getPath();
    }
  }

  private abstract static class ToFileURI<I> implements MapPartitionsFunction<I, FileURI> {

    private final Map<String, String> equalSchemes;
    private final Map<String, String> equalAuthorities;

    ToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
      this.equalSchemes = equalSchemes;
      this.equalAuthorities = equalAuthorities;
    }

    protected abstract String uriAsString(I input);

    Dataset<FileURI> apply(Dataset<I> ds) {
      return ds.mapPartitions(this, FileURI.ENCODER);
    }

    @Override
    public Iterator<FileURI> call(Iterator<I> rows) throws Exception {
      return Iterators.transform(rows, this::toFileURI);
    }

    private FileURI toFileURI(I input) {
      String uriAsString = uriAsString(input);
      URI uri = new Path(uriAsString).toUri();
      String scheme = equalSchemes.getOrDefault(uri.getScheme(), uri.getScheme());
      String authority = equalAuthorities.getOrDefault(uri.getAuthority(), uri.getAuthority());
      return new FileURI(scheme, authority, uri.getPath(), uriAsString);
    }
  }

  /**
   * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be
   * marked as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of
   * the characters that indicate a hidden path.
   */
  @VisibleForTesting
  static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

    private final Set<String> hiddenPathPartitionNames;

    PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
      this.hiddenPathPartitionNames = hiddenPathPartitionNames;
    }

    @Override
    public boolean accept(Path path) {
      return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
    }

    private boolean isHiddenPartitionPath(Path path) {
      return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
    }

    static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
      if (specs == null) {
        return HiddenPathFilter.get();
      }

      Set<String> partitionNames =
          specs.values().stream()
              .map(PartitionSpec::fields)
              .flatMap(List::stream)
              .filter(field -> field.name().startsWith("_") || field.name().startsWith("."))
              .map(field -> field.name() + "=")
              .collect(Collectors.toSet());

      if (partitionNames.isEmpty()) {
        return HiddenPathFilter.get();
      } else {
        return new PartitionAwareHiddenPathFilter(partitionNames);
      }
    }
  }

  public static class FileURI {
    public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);

    private String scheme;
    private String authority;
    private String path;
    private String uriAsString;

    public FileURI(String scheme, String authority, String path, String uriAsString) {
      this.scheme = scheme;
      this.authority = authority;
      this.path = path;
      this.uriAsString = uriAsString;
    }

    public FileURI() {}

    public void setScheme(String scheme) {
      this.scheme = scheme;
    }

    public void setAuthority(String authority) {
      this.authority = authority;
    }

    public void setPath(String path) {
      this.path = path;
    }

    public void setUriAsString(String uriAsString) {
      this.uriAsString = uriAsString;
    }

    public String getScheme() {
      return scheme;
    }

    public String getAuthority() {
      return authority;
    }

    public String getPath() {
      return path;
    }

    public String getUriAsString() {
      return uriAsString;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



