in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java [133:299]
public boolean hasNext() throws IOException
{
if (allPartitions == null)
{
allPartitions = initializePartitions();
}
while (true)
{
if (partition == null)
{
try
{
// We've exhausted the partition iterator
if (allPartitions.hasNext())
{
// Advance to next partition
partition = allPartitions.next();
if (partition.partitionLevelDeletion().isLive())
{
// Reset rid with new partition key
rid.setPartitionKeyCopy(partition.partitionKey().getKey(),
ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken()));
}
else
{
// There's a partition-level delete
handlePartitionTombstone(partition);
return true;
}
}
else
{
return false;
}
}
catch (SSTableStreamException exception)
{
throw exception.getIOException();
}
// If the partition has a non-empty static row, grab its columns,
// so we process those before moving onto its atoms (the Unfiltered instances)
staticRow = partition.staticRow();
if (!staticRow.isEmpty())
{
columns = staticRow.iterator();
prepareColumnData();
return true;
}
}
// We may be in the midst of processing some multi-cell column data,
// if so, we'll resume that where we left off
if (columnData != null && columnData.hasData())
{
return true;
}
// Continue to process columns of the last read row, which may be static
if (columns != null && columns.hasNext())
{
prepareColumnData();
return true;
}
// Current row was exhausted (or none were present), so move to the next atom
columns = null;
try
{
// Advance to next unfiltered
rid.setIsUpdate(false); // Reset isUpdate flag
if (partition.hasNext())
{
unfiltered = partition.next();
}
else
{
// Current partition is exhausted
partition = null;
unfiltered = null;
// Produce a spark row if there are range tombstone markers
if (rid.hasRangeTombstoneMarkers())
{
// The current partition is exhusted and ready to produce a spark row for the range tombstones
rid.setShouldConsumeRangeTombstoneMarkers(true);
return true;
}
}
}
catch (SSTableStreamException exception)
{
throw exception.getIOException();
}
if (unfiltered != null)
{
if (unfiltered.isRow())
{
Row row = (Row) unfiltered;
// There is a CQL row level delete
if (!row.deletion().isLive())
{
handleRowTombstone(row);
return true;
}
// For non-compact tables, set up a ClusteringColumnDataState to emit a Rid that emulates a
// pre-3.0 CQL row marker. This is necessary for backwards compatibility with 2.1 & 2.0 output,
// and also for tables with only primary key columns defined.
// An empty PKLI is the 3.0 equivalent of having no row marker (e.g. row modifications via
// UPDATE not INSERT) so we don't emit a fake row marker in that case.
boolean emptyLiveness = row.primaryKeyLivenessInfo().isEmpty();
rid.setIsUpdate(emptyLiveness);
if (!emptyLiveness)
{
if (TableMetadata.Flag.isCQLTable(metadata.flags))
{
columnData = new ClusteringColumnDataState(row.clustering());
}
columns = row.iterator();
return true;
}
// The row's actual columns may be empty, in which case we'll simply skip over them during the next
// iteration and move to the next unfiltered. So then only the row marker and/or row deletion (if
// either are present) will get emitted
columns = row.iterator();
}
else if (unfiltered.isRangeTombstoneMarker())
{
// Range tombstone can get complicated:
// - In the most simple case, that is a DELETE statement with a single clustering key range, we
// expect the UnfilteredRowIterator with 2 markers, i.e. open and close range tombstone markers
// - In a slightly more complicated case, it contains IN operator (on prior clustering keys), we
// expect the UnfilteredRowIterator with 2 * N markers, where N is the number of values specified
// for IN
// - In the most complicated case, client could comopse a complex partition update with a BATCH
// statement; it could have those further scenarios: (only discussing the statements applying to
// the same partition key)
// - Multiple disjoint ranges => we should expect 2 * N markers, where N is the number of ranges
// - Overlapping ranges with the same timestamp => we should expect 2 markers, considering the
// overlapping ranges are merged into a single one. (as the boundary is omitted)
// - Overlapping ranges with different timestamp ==> we should expect 3 markers, i.e. open bound,
// boundary and end bound
// - Ranges mixed with INSERT! => The order of the unfiltered (i.e. Row/RangeTombstoneMarker) is
// determined by comparing the row clustering with the bounds of the ranges.
// See o.a.c.d.r.RowAndDeletionMergeIterator
RangeTombstoneMarker rangeTombstoneMarker = (RangeTombstoneMarker) unfiltered;
// We encode the ranges within the same spark row. Therefore, it needs to keep the markers when
// iterating through the partition, and _only_ generate a spark row with range tombstone info when
// exhausting the partition / UnfilteredRowIterator.
handleRangeTombstone(rangeTombstoneMarker);
// Continue to consume the next unfiltered row/marker
}
else
{
// As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker,
// see o.a.c.db.rows.Unfiltered.Kind; having the else branch only for completeness
throw new IllegalStateException("Encountered unknown Unfiltered kind");
}
}
}
}