in paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedRleValuesReader.java [427:581]
public void readBatchRepeatedInternal(
ParquetReadState state,
WritableIntVector repLevels,
VectorizedRleValuesReader defLevelsReader,
WritableIntVector defLevels,
WritableColumnVector values,
WritableColumnVector nulls,
boolean valuesReused,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) {
int leftInBatch = state.rowsToReadInBatch;
int leftInPage = state.valuesToReadInPage;
long rowId = state.rowId;
DefLevelProcessor defLevelProcessor =
new DefLevelProcessor(
defLevelsReader,
state,
defLevels,
values,
nulls,
valuesReused,
valueReader,
updater);
while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
if (currentCount == 0 && !readNextGroup()) {
break;
}
// Values to read in the current RLE/PACKED block, must be <= what's left in the page
int valuesLeftInBlock = Math.min(leftInPage, currentCount);
// The current row range start and end
long rangeStart = state.currentRangeStart();
long rangeEnd = state.currentRangeEnd();
switch (mode) {
case RLE:
{
// This RLE block is consist of top-level rows, so we'll need to check
// if the rows should be skipped according to row indexes.
if (currentValue == 0) {
if (leftInBatch == 0) {
state.lastListCompleted = true;
} else {
// # of rows to read in the block, must be <= what's left in the
// current batch
int n = Math.min(leftInBatch, valuesLeftInBlock);
if (rowId + n < rangeStart) {
// Need to skip all rows in [rowId, rowId + n)
defLevelProcessor.skipValues(n);
rowId += n;
currentCount -= n;
leftInPage -= n;
} else if (rowId > rangeEnd) {
// The current row index already beyond the current range: move
// to the next range
// and repeat
state.nextRange();
} else {
// The range [rowId, rowId + n) overlaps with the current row
// range
long start = Math.max(rangeStart, rowId);
long end = Math.min(rangeEnd, rowId + n - 1);
// Skip the rows in [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
defLevelProcessor.skipValues(toSkip);
rowId += toSkip;
currentCount -= toSkip;
leftInPage -= toSkip;
}
// Read the rows in [start, end]
n = (int) (end - start + 1);
if (n > 0) {
repLevels.appendInts(n, 0);
defLevelProcessor.readValues(n);
}
rowId += n;
currentCount -= n;
leftInBatch -= n;
leftInPage -= n;
}
}
} else {
// Not a top-level row: just read all the repetition levels in the block
// if the row
// should be included according to row indexes, else skip the rows.
if (!state.shouldSkip) {
repLevels.appendInts(valuesLeftInBlock, currentValue);
}
state.numBatchedDefLevels += valuesLeftInBlock;
leftInPage -= valuesLeftInBlock;
currentCount -= valuesLeftInBlock;
}
break;
}
case PACKED:
{
int i = 0;
for (; i < valuesLeftInBlock; i++) {
int currentValue = currentBuffer[currentBufferIdx + i];
if (currentValue == 0) {
if (leftInBatch == 0) {
state.lastListCompleted = true;
break;
} else if (rowId < rangeStart) {
// This is a top-level row, therefore check if we should skip it
// with row indexes
// the row is before the current range, skip it
defLevelProcessor.skipValues(1);
} else if (rowId > rangeEnd) {
// The row is after the current range, move to the next range
// and compare again
state.nextRange();
break;
} else {
// The row is in the current range, decrement the row counter
// and read it
leftInBatch--;
repLevels.appendInt(0);
defLevelProcessor.readValues(1);
}
rowId++;
} else {
if (!state.shouldSkip) {
repLevels.appendInt(currentValue);
}
state.numBatchedDefLevels += 1;
}
}
leftInPage -= i;
currentCount -= i;
currentBufferIdx += i;
break;
}
}
}
// Process all the batched def levels
defLevelProcessor.finish();
state.rowsToReadInBatch = leftInBatch;
state.valuesToReadInPage = leftInPage;
state.rowId = rowId;
}