public static List readAllFootersInParallelUsingSummaryFiles()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java [162:230]


  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
      final Configuration configuration,
      final Collection<FileStatus> partFiles,
      final boolean skipRowGroups) throws IOException {

    // figure out list of all parents to part files
    Set<Path> parents = new HashSet<Path>();
    for (FileStatus part : partFiles) {
      parents.add(part.getPath().getParent());
    }

    // read corresponding summary files if they exist
    List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
    for (final Path path : parents) {
      summaries.add(() -> {
        ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
        if (mergedMetadata != null) {
          final List<Footer> footers;
          if (skipRowGroups) {
            footers = new ArrayList<Footer>();
            for (FileStatus f : partFiles) {
              footers.add(new Footer(f.getPath(), mergedMetadata));
            }
          } else {
            footers = footersFromSummaryFile(path, mergedMetadata);
          }
          Map<Path, Footer> map = new HashMap<Path, Footer>();
          for (Footer footer : footers) {
            // the folder may have been moved
            footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
            map.put(footer.getFile(), footer);
          }
          return map;
        } else {
          return Collections.emptyMap();
        }
      });
    }

    Map<Path, Footer> cache = new HashMap<Path, Footer>();
    try {
      List<Map<Path, Footer>> footersFromSummaries = runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), summaries);
      for (Map<Path, Footer> footers : footersFromSummaries) {
        cache.putAll(footers);
      }
    } catch (ExecutionException e) {
      throw new IOException("Error reading summaries", e);
    }

    // keep only footers for files actually requested and read file footer if not found in summaries
    List<Footer> result = new ArrayList<Footer>(partFiles.size());
    List<FileStatus> toRead = new ArrayList<FileStatus>();
    for (FileStatus part : partFiles) {
      Footer f = cache.get(part.getPath());
      if (f != null) {
        result.add(f);
      } else {
        toRead.add(part);
      }
    }

    if (toRead.size() > 0) {
      // read the footers of the files that did not have a summary file
      LOG.info("reading another {} footers", toRead.size());
      result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
    }

    return result;
  }