public void processBlocks()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [386:462]


  public void processBlocks() throws IOException {
    TransParquetFileReader readerToJoin = null;
    IndexCache indexCacheToJoin = null;
    int blockIdxToJoin = 0;
    List<ColumnDescriptor> outColumns = outSchema.getColumns();

    while (!inputFiles.isEmpty()) {
      TransParquetFileReader reader = inputFiles.poll();
      LOG.info("Rewriting input file: {}, remaining files: {}", reader.getFile(), inputFiles.size());
      ParquetMetadata meta = reader.getFooter();
      Set<ColumnPath> columnPaths = meta.getFileMetaData().getSchema().getColumns().stream()
          .map(x -> ColumnPath.get(x.getPath()))
          .collect(Collectors.toSet());
      IndexCache indexCache = IndexCache.create(reader, columnPaths, indexCacheStrategy, true);

      for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
        BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
        writer.startBlock(blockMetaData.getRowCount());
        indexCache.setBlockMetadata(blockMetaData);
        Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
            blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> x.getPath(), x -> x));

        if (!inputFilesToJoin.isEmpty()) {
          if (readerToJoin == null
              || ++blockIdxToJoin
                  == readerToJoin.getFooter().getBlocks().size()) {
            if (readerToJoin != null) readerToJoin.close();
            blockIdxToJoin = 0;
            readerToJoin = inputFilesToJoin.poll();
            Set<ColumnPath> columnPathsToJoin =
                readerToJoin.getFileMetaData().getSchema().getColumns().stream()
                    .map(x -> ColumnPath.get(x.getPath()))
                    .collect(Collectors.toSet());
            if (indexCacheToJoin != null) {
              indexCacheToJoin.clean();
            }
            indexCacheToJoin = IndexCache.create(readerToJoin, columnPathsToJoin, indexCacheStrategy, true);
            indexCacheToJoin.setBlockMetadata(
                readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
          } else {
            blockIdxToJoin++;
            indexCacheToJoin.setBlockMetadata(
                readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
          }
        }

        for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); outColumnIdx++) {
          ColumnPath colPath =
              ColumnPath.get(outColumns.get(outColumnIdx).getPath());
          if (readerToJoin != null) {
            Optional<ColumnChunkMetaData> chunkToJoin =
                readerToJoin.getFooter().getBlocks().get(blockIdxToJoin).getColumns().stream()
                    .filter(x -> x.getPath().equals(colPath))
                    .findFirst();
            if (chunkToJoin.isPresent()
                && (overwriteInputWithJoinColumns || !columnPaths.contains(colPath))) {
              processBlock(
                  readerToJoin, blockIdxToJoin, outColumnIdx, indexCacheToJoin, chunkToJoin.get());
            } else {
              processBlock(reader, blockIdx, outColumnIdx, indexCache, pathToChunk.get(colPath));
            }
          } else {
            processBlock(reader, blockIdx, outColumnIdx, indexCache, pathToChunk.get(colPath));
          }
        }

        writer.endBlock();
        indexCache.clean();
        numBlocksRewritten++;
      }

      indexCache.clean();
      LOG.info("Finish rewriting input file: {}", reader.getFile());
      reader.close();
    }
    if (readerToJoin != null) readerToJoin.close();
  }