in common/src/main/java/org/apache/comet/parquet/FileReader.java [248:292]
public PageReadStore readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
this.currentRowGroup = new RowGroupReader(block.getRowCount());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
long startingPos = mc.getStartingPos();
boolean mergeRanges = cometOptions.isIOMergeRangesEnabled();
int mergeRangeDelta = cometOptions.getIOMergeRangesDelta();
// start a new list if -
// it is the first part or
// the part is consecutive or
// the part is not consecutive but within the merge range
if (currentParts == null
|| (!mergeRanges && currentParts.endPos() != startingPos)
|| (mergeRanges && startingPos - currentParts.endPos() > mergeRangeDelta)) {
currentParts = new ConsecutivePartList(startingPos);
allParts.add(currentParts);
}
// if we are in a consecutive part list and there is a gap in between the parts,
// we treat the gap as a skippable chunk
long delta = startingPos - currentParts.endPos();
if (mergeRanges && delta > 0 && delta <= mergeRangeDelta) {
// add a chunk that will be skipped because it has no column descriptor
currentParts.addChunk(new ChunkDescriptor(null, null, startingPos, delta));
}
currentParts.addChunk(
new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
}
}
// actually read all the chunks
return readChunks(block, allParts, new ChunkListBuilder());
}