in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java [149:283]
public boolean next() throws IOException
{
if (allPartitions == null)
{
allPartitions = initializePartitions();
}
while (true)
{
if (partition == null)
{
try
{
// We've exhausted the partition iterator
if (allPartitions.hasNext())
{
// Advance to next partition
partition = allPartitions.next();
BigInteger token = ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken());
if (partition.partitionLevelDeletion().isLive())
{
// Reset rid with new partition key
rowData.setPartitionKeyCopy(partition.partitionKey().getKey(), token);
}
else
{
// There's a partition-level delete
handlePartitionTombstone(token, partition);
return true;
}
}
else
{
return false;
}
}
catch (SSTableStreamException exception)
{
throw exception.getIOException();
}
// If the partition has a non-empty static row, grab its columns,
// so we process those before moving onto its atoms (the Unfiltered instances)
staticRow = partition.staticRow();
if (!staticRow.isEmpty())
{
columns = staticRow.iterator();
prepareColumnData();
return true;
}
}
// We may be in the midst of processing some multi-cell column data,
// if so, we'll resume that where we left off
if (columnData != null && columnData.hasData())
{
return true;
}
// Continue to process columns of the last read row, which may be static
if (columns != null && columns.hasNext())
{
prepareColumnData();
return true;
}
// Current row was exhausted (or none were present), so move to the next atom
columns = null;
try
{
// Advance to next unfiltered
if (partition.hasNext())
{
unfiltered = partition.next();
}
else
{
// Current partition is exhausted
partition = null;
unfiltered = null;
}
}
catch (SSTableStreamException exception)
{
throw exception.getIOException();
}
if (unfiltered != null)
{
if (unfiltered.isRow())
{
Row row = (Row) unfiltered;
// There is a CQL row level delete
if (!row.deletion().isLive())
{
handleRowTombstone(rowData.getToken(), row);
return true;
}
// For non-compact tables, set up a ClusteringColumnDataState to emit a RowData that emulates a
// pre-3.0 CQL row marker. This is necessary for backwards compatibility with 2.1 & 2.0 output,
// and also for tables with only primary key columns defined.
// An empty PKLI is the 3.0 equivalent of having no row marker (e.g. row modifications via
// UPDATE not INSERT) so we don't emit a fake row marker in that case.
if (!row.primaryKeyLivenessInfo().isEmpty())
{
if (TableMetadata.Flag.isCQLTable(metadata.flags))
{
columnData = new ClusteringColumnDataState(row.clustering());
}
columns = row.iterator();
return true;
}
// The row's actual columns may be empty, in which case we'll simply skip over them during the next
// iteration and move to the next unfiltered. So then only the row marker and/or row deletion (if
// either are present) will get emitted
columns = row.iterator();
}
else if (unfiltered.isRangeTombstoneMarker())
{
throw new IllegalStateException("Encountered RangeTombstoneMarker. " +
"It should have been purged in CompactionIterator");
}
else
{
// As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker,
// see o.a.c.db.rows.Unfiltered.Kind; having the else branch only for completeness
throw new IllegalStateException("Encountered unknown Unfiltered kind");
}
}
}
}