public void readBatchRepeatedInternal()

in paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedRleValuesReader.java [427:581]


    public void readBatchRepeatedInternal(
            ParquetReadState state,
            WritableIntVector repLevels,
            VectorizedRleValuesReader defLevelsReader,
            WritableIntVector defLevels,
            WritableColumnVector values,
            WritableColumnVector nulls,
            boolean valuesReused,
            VectorizedValuesReader valueReader,
            ParquetVectorUpdater updater) {

        int leftInBatch = state.rowsToReadInBatch;
        int leftInPage = state.valuesToReadInPage;
        long rowId = state.rowId;

        DefLevelProcessor defLevelProcessor =
                new DefLevelProcessor(
                        defLevelsReader,
                        state,
                        defLevels,
                        values,
                        nulls,
                        valuesReused,
                        valueReader,
                        updater);

        while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
            if (currentCount == 0 && !readNextGroup()) {
                break;
            }

            // Values to read in the current RLE/PACKED block, must be <= what's left in the page
            int valuesLeftInBlock = Math.min(leftInPage, currentCount);

            // The current row range start and end
            long rangeStart = state.currentRangeStart();
            long rangeEnd = state.currentRangeEnd();

            switch (mode) {
                case RLE:
                    {
                        // This RLE block is consist of top-level rows, so we'll need to check
                        // if the rows should be skipped according to row indexes.
                        if (currentValue == 0) {
                            if (leftInBatch == 0) {
                                state.lastListCompleted = true;
                            } else {
                                // # of rows to read in the block, must be <= what's left in the
                                // current batch
                                int n = Math.min(leftInBatch, valuesLeftInBlock);

                                if (rowId + n < rangeStart) {
                                    // Need to skip all rows in [rowId, rowId + n)
                                    defLevelProcessor.skipValues(n);
                                    rowId += n;
                                    currentCount -= n;
                                    leftInPage -= n;
                                } else if (rowId > rangeEnd) {
                                    // The current row index already beyond the current range: move
                                    // to the next range
                                    // and repeat
                                    state.nextRange();
                                } else {
                                    // The range [rowId, rowId + n) overlaps with the current row
                                    // range
                                    long start = Math.max(rangeStart, rowId);
                                    long end = Math.min(rangeEnd, rowId + n - 1);

                                    // Skip the rows in [rowId, start)
                                    int toSkip = (int) (start - rowId);
                                    if (toSkip > 0) {
                                        defLevelProcessor.skipValues(toSkip);
                                        rowId += toSkip;
                                        currentCount -= toSkip;
                                        leftInPage -= toSkip;
                                    }

                                    // Read the rows in [start, end]
                                    n = (int) (end - start + 1);

                                    if (n > 0) {
                                        repLevels.appendInts(n, 0);
                                        defLevelProcessor.readValues(n);
                                    }

                                    rowId += n;
                                    currentCount -= n;
                                    leftInBatch -= n;
                                    leftInPage -= n;
                                }
                            }
                        } else {
                            // Not a top-level row: just read all the repetition levels in the block
                            // if the row
                            // should be included according to row indexes, else skip the rows.
                            if (!state.shouldSkip) {
                                repLevels.appendInts(valuesLeftInBlock, currentValue);
                            }
                            state.numBatchedDefLevels += valuesLeftInBlock;
                            leftInPage -= valuesLeftInBlock;
                            currentCount -= valuesLeftInBlock;
                        }
                        break;
                    }
                case PACKED:
                    {
                        int i = 0;

                        for (; i < valuesLeftInBlock; i++) {
                            int currentValue = currentBuffer[currentBufferIdx + i];
                            if (currentValue == 0) {
                                if (leftInBatch == 0) {
                                    state.lastListCompleted = true;
                                    break;
                                } else if (rowId < rangeStart) {
                                    // This is a top-level row, therefore check if we should skip it
                                    // with row indexes
                                    // the row is before the current range, skip it
                                    defLevelProcessor.skipValues(1);
                                } else if (rowId > rangeEnd) {
                                    // The row is after the current range, move to the next range
                                    // and compare again
                                    state.nextRange();
                                    break;
                                } else {
                                    // The row is in the current range, decrement the row counter
                                    // and read it
                                    leftInBatch--;
                                    repLevels.appendInt(0);
                                    defLevelProcessor.readValues(1);
                                }
                                rowId++;
                            } else {
                                if (!state.shouldSkip) {
                                    repLevels.appendInt(currentValue);
                                }
                                state.numBatchedDefLevels += 1;
                            }
                        }

                        leftInPage -= i;
                        currentCount -= i;
                        currentBufferIdx += i;
                        break;
                    }
            }
        }

        // Process all the batched def levels
        defLevelProcessor.finish();

        state.rowsToReadInBatch = leftInBatch;
        state.valuesToReadInPage = leftInPage;
        state.rowId = rowId;
    }