in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [386:462]
public void processBlocks() throws IOException {
TransParquetFileReader readerToJoin = null;
IndexCache indexCacheToJoin = null;
int blockIdxToJoin = 0;
List<ColumnDescriptor> outColumns = outSchema.getColumns();
while (!inputFiles.isEmpty()) {
TransParquetFileReader reader = inputFiles.poll();
LOG.info("Rewriting input file: {}, remaining files: {}", reader.getFile(), inputFiles.size());
ParquetMetadata meta = reader.getFooter();
Set<ColumnPath> columnPaths = meta.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
IndexCache indexCache = IndexCache.create(reader, columnPaths, indexCacheStrategy, true);
for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
writer.startBlock(blockMetaData.getRowCount());
indexCache.setBlockMetadata(blockMetaData);
Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> x.getPath(), x -> x));
if (!inputFilesToJoin.isEmpty()) {
if (readerToJoin == null
|| ++blockIdxToJoin
== readerToJoin.getFooter().getBlocks().size()) {
if (readerToJoin != null) readerToJoin.close();
blockIdxToJoin = 0;
readerToJoin = inputFilesToJoin.poll();
Set<ColumnPath> columnPathsToJoin =
readerToJoin.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
if (indexCacheToJoin != null) {
indexCacheToJoin.clean();
}
indexCacheToJoin = IndexCache.create(readerToJoin, columnPathsToJoin, indexCacheStrategy, true);
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
} else {
blockIdxToJoin++;
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
}
}
for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); outColumnIdx++) {
ColumnPath colPath =
ColumnPath.get(outColumns.get(outColumnIdx).getPath());
if (readerToJoin != null) {
Optional<ColumnChunkMetaData> chunkToJoin =
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin).getColumns().stream()
.filter(x -> x.getPath().equals(colPath))
.findFirst();
if (chunkToJoin.isPresent()
&& (overwriteInputWithJoinColumns || !columnPaths.contains(colPath))) {
processBlock(
readerToJoin, blockIdxToJoin, outColumnIdx, indexCacheToJoin, chunkToJoin.get());
} else {
processBlock(reader, blockIdx, outColumnIdx, indexCache, pathToChunk.get(colPath));
}
} else {
processBlock(reader, blockIdx, outColumnIdx, indexCache, pathToChunk.get(colPath));
}
}
writer.endBlock();
indexCache.clean();
numBlocksRewritten++;
}
indexCache.clean();
LOG.info("Finish rewriting input file: {}", reader.getFile());
reader.close();
}
if (readerToJoin != null) readerToJoin.close();
}