in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java [162:230]
public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
final Configuration configuration,
final Collection<FileStatus> partFiles,
final boolean skipRowGroups) throws IOException {
// figure out list of all parents to part files
Set<Path> parents = new HashSet<Path>();
for (FileStatus part : partFiles) {
parents.add(part.getPath().getParent());
}
// read corresponding summary files if they exist
List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
for (final Path path : parents) {
summaries.add(() -> {
ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
if (mergedMetadata != null) {
final List<Footer> footers;
if (skipRowGroups) {
footers = new ArrayList<Footer>();
for (FileStatus f : partFiles) {
footers.add(new Footer(f.getPath(), mergedMetadata));
}
} else {
footers = footersFromSummaryFile(path, mergedMetadata);
}
Map<Path, Footer> map = new HashMap<Path, Footer>();
for (Footer footer : footers) {
// the folder may have been moved
footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
map.put(footer.getFile(), footer);
}
return map;
} else {
return Collections.emptyMap();
}
});
}
Map<Path, Footer> cache = new HashMap<Path, Footer>();
try {
List<Map<Path, Footer>> footersFromSummaries = runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), summaries);
for (Map<Path, Footer> footers : footersFromSummaries) {
cache.putAll(footers);
}
} catch (ExecutionException e) {
throw new IOException("Error reading summaries", e);
}
// keep only footers for files actually requested and read file footer if not found in summaries
List<Footer> result = new ArrayList<Footer>(partFiles.size());
List<FileStatus> toRead = new ArrayList<FileStatus>();
for (FileStatus part : partFiles) {
Footer f = cache.get(part.getPath());
if (f != null) {
result.add(f);
} else {
toRead.add(part);
}
}
if (toRead.size() > 0) {
// read the footers of the files that did not have a summary file
LOG.info("reading another {} footers", toRead.size());
result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
}
return result;
}