public PageReadStore readNextRowGroup()

in common/src/main/java/org/apache/comet/parquet/FileReader.java [248:292]


  public PageReadStore readNextRowGroup() throws IOException {
    if (currentBlock == blocks.size()) {
      return null;
    }
    BlockMetaData block = blocks.get(currentBlock);
    if (block.getRowCount() == 0) {
      throw new RuntimeException("Illegal row group of 0 rows");
    }
    this.currentRowGroup = new RowGroupReader(block.getRowCount());
    // prepare the list of consecutive parts to read them in one scan
    List<ConsecutivePartList> allParts = new ArrayList<>();
    ConsecutivePartList currentParts = null;
    for (ColumnChunkMetaData mc : block.getColumns()) {
      ColumnPath pathKey = mc.getPath();
      ColumnDescriptor columnDescriptor = paths.get(pathKey);
      if (columnDescriptor != null) {
        BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
        long startingPos = mc.getStartingPos();
        boolean mergeRanges = cometOptions.isIOMergeRangesEnabled();
        int mergeRangeDelta = cometOptions.getIOMergeRangesDelta();

        // start a new list if -
        //   it is the first part or
        //   the part is consecutive or
        //   the part is not consecutive but within the merge range
        if (currentParts == null
            || (!mergeRanges && currentParts.endPos() != startingPos)
            || (mergeRanges && startingPos - currentParts.endPos() > mergeRangeDelta)) {
          currentParts = new ConsecutivePartList(startingPos);
          allParts.add(currentParts);
        }
        // if we are in a consecutive part list and there is a gap in between the parts,
        // we treat the gap as a skippable chunk
        long delta = startingPos - currentParts.endPos();
        if (mergeRanges && delta > 0 && delta <= mergeRangeDelta) {
          // add a chunk that will be skipped because it has no column descriptor
          currentParts.addChunk(new ChunkDescriptor(null, null, startingPos, delta));
        }
        currentParts.addChunk(
            new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
      }
    }
    // actually read all the chunks
    return readChunks(block, allParts, new ChunkListBuilder());
  }