in src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java [71:216]
protected UnfilteredRowIterator computeNext()
{
// exhaust previous throttled iterator
while (throttledItr != null && throttledItr.hasNext())
throttledItr.next();
// The original UnfilteredRowIterator may have only partition deletion or static column but without unfiltereds.
// Return the original UnfilteredRowIterator
if (!origin.hasNext())
{
if (throttledItr != null)
return endOfData();
return throttledItr = origin;
}
throttledItr = new WrappingUnfilteredRowIterator()
{
private int count = 0;
private boolean isFirst = throttledItr == null;
// current batch's openMarker. if it's generated in previous batch,
// it must be consumed as first element of current batch
private RangeTombstoneMarker openMarker;
// current batch's closeMarker.
// it must be consumed as last element of current batch
private RangeTombstoneMarker closeMarker = null;
@Override
public UnfilteredRowIterator wrapped()
{
return origin;
}
@Override
public boolean hasNext()
{
return (withinLimit() && origin.hasNext()) || closeMarker != null;
}
@Override
public Unfiltered next()
{
if (closeMarker != null)
{
assert count == throttle;
Unfiltered toReturn = closeMarker;
closeMarker = null;
return toReturn;
}
Unfiltered next;
assert withinLimit();
// in the beginning of the batch, there might be remaining unfiltereds from previous iteration
if (overflowed.hasNext())
next = overflowed.next();
else
next = origin.next();
recordNext(next);
return next;
}
private void recordNext(Unfiltered unfiltered)
{
count++;
if (unfiltered.isRangeTombstoneMarker())
updateMarker((RangeTombstoneMarker) unfiltered);
// when reach throttle with a remaining openMarker, we need to create corresponding closeMarker.
if (count == throttle && openMarker != null)
{
assert origin.hasNext();
closeOpenMarker(origin.next());
}
}
private boolean withinLimit()
{
return count < throttle;
}
private void updateMarker(RangeTombstoneMarker marker)
{
openMarker = marker.isOpen(isReverseOrder()) ? marker : null;
}
/**
* There 3 cases for next, 1. if it's boundaryMarker, we split it as closeMarker for current batch, next
* openMarker for next batch 2. if it's boundMakrer, it must be closeMarker. 3. if it's Row, create
* corresponding closeMarker for current batch, and create next openMarker for next batch including current
* Row.
*/
private void closeOpenMarker(Unfiltered next)
{
assert openMarker != null;
if (next.isRangeTombstoneMarker())
{
RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
// if it's boundary, create closeMarker for current batch and openMarker for next batch
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker) marker;
closeMarker = boundary.createCorrespondingCloseMarker(isReverseOrder());
overflowed = Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator();
}
else
{
// if it's bound, it must be closeMarker.
assert marker.isClose(isReverseOrder());
updateMarker(marker);
closeMarker = marker;
}
}
else
{
// it's Row, need to create closeMarker for current batch and openMarker for next batch
DeletionTime openDeletion = openMarker.openDeletionTime(isReverseOrder());
closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(), next.clustering(), openDeletion);
// for next batch
overflowed = Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(),
next.clustering(),
openDeletion), next).iterator();
}
}
@Override
public DeletionTime partitionLevelDeletion()
{
return isFirst ? origin.partitionLevelDeletion() : DeletionTime.LIVE;
}
@Override
public Row staticRow()
{
return isFirst ? origin.staticRow() : Rows.EMPTY_STATIC_ROW;
}
@Override
public void close()
{
// no op
}
};
return throttledItr;
}