protected UnfilteredRowIterator computeNext()

in src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java [71:216]


    protected UnfilteredRowIterator computeNext()
    {
        // exhaust previous throttled iterator
        while (throttledItr != null && throttledItr.hasNext())
            throttledItr.next();

        // The original UnfilteredRowIterator may have only partition deletion or static column but without unfiltereds.
        // Return the original UnfilteredRowIterator
        if (!origin.hasNext())
        {
            if (throttledItr != null)
                return endOfData();
            return throttledItr = origin;
        }

        throttledItr = new WrappingUnfilteredRowIterator()
        {
            private int count = 0;
            private boolean isFirst = throttledItr == null;

            // current batch's openMarker. if it's generated in previous batch,
            // it must be consumed as first element of current batch
            private RangeTombstoneMarker openMarker;

            // current batch's closeMarker.
            // it must be consumed as last element of current batch
            private RangeTombstoneMarker closeMarker = null;

            @Override
            public UnfilteredRowIterator wrapped()
            {
                return origin;
            }

            @Override
            public boolean hasNext()
            {
                return (withinLimit() && origin.hasNext()) || closeMarker != null;
            }

            @Override
            public Unfiltered next()
            {
                if (closeMarker != null)
                {
                    assert count == throttle;
                    Unfiltered toReturn = closeMarker;
                    closeMarker = null;
                    return toReturn;
                }

                Unfiltered next;
                assert withinLimit();
                // in the beginning of the batch, there might be remaining unfiltereds from previous iteration
                if (overflowed.hasNext())
                    next = overflowed.next();
                else
                    next = origin.next();
                recordNext(next);
                return next;
            }

            private void recordNext(Unfiltered unfiltered)
            {
                count++;
                if (unfiltered.isRangeTombstoneMarker())
                    updateMarker((RangeTombstoneMarker) unfiltered);
                // when reach throttle with a remaining openMarker, we need to create corresponding closeMarker.
                if (count == throttle && openMarker != null)
                {
                    assert origin.hasNext();
                    closeOpenMarker(origin.next());
                }
            }

            private boolean withinLimit()
            {
                return count < throttle;
            }

            private void updateMarker(RangeTombstoneMarker marker)
            {
                openMarker = marker.isOpen(isReverseOrder()) ? marker : null;
            }

            /**
             * There 3 cases for next, 1. if it's boundaryMarker, we split it as closeMarker for current batch, next
             * openMarker for next batch 2. if it's boundMakrer, it must be closeMarker. 3. if it's Row, create
             * corresponding closeMarker for current batch, and create next openMarker for next batch including current
             * Row.
             */
            private void closeOpenMarker(Unfiltered next)
            {
                assert openMarker != null;

                if (next.isRangeTombstoneMarker())
                {
                    RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
                    // if it's boundary, create closeMarker for current batch and openMarker for next batch
                    if (marker.isBoundary())
                    {
                        RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker) marker;
                        closeMarker = boundary.createCorrespondingCloseMarker(isReverseOrder());
                        overflowed = Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator();
                    }
                    else
                    {
                        // if it's bound, it must be closeMarker.
                        assert marker.isClose(isReverseOrder());
                        updateMarker(marker);
                        closeMarker = marker;
                    }
                }
                else
                {
                    // it's Row, need to create closeMarker for current batch and openMarker for next batch
                    DeletionTime openDeletion = openMarker.openDeletionTime(isReverseOrder());
                    closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(), next.clustering(), openDeletion);

                    // for next batch
                    overflowed = Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(),
                                                                                       next.clustering(),
                                                                                       openDeletion), next).iterator();
                }
            }

            @Override
            public DeletionTime partitionLevelDeletion()
            {
                return isFirst ? origin.partitionLevelDeletion() : DeletionTime.LIVE;
            }

            @Override
            public Row staticRow()
            {
                return isFirst ? origin.staticRow() : Rows.EMPTY_STATIC_ROW;
            }

            @Override
            public void close()
            {
                // no op
            }
        };
        return throttledItr;
    }