in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/scanner/CdcSortedStreamScanner.java [109:202]
public boolean next()
{
while (true)
{
if (allExhausted())
{
return false;
}
// sampling CDC events for tracking
String trackingId = null;
if (random.nextDouble() < samplingRate())
{
trackingId = UUID.randomUUID().toString();
}
if (currentPartition == null)
{
currentPartition = partitionIterator.next();
// it is a Cassandra partition deletion
if (!currentPartition.partitionLevelDeletion().isLive())
{
event = makePartitionTombstone(currentPartition, trackingId);
currentPartition = null;
return true;
}
// the partition contains no other rows but only a static row
Row staticRow = currentPartition.staticRow();
if (!currentPartition.hasNext() && staticRow != null && staticRow != Rows.EMPTY_STATIC_ROW)
{
event = makeStaticRow(staticRow, currentPartition, trackingId);
currentPartition = null; // reset
return true;
}
}
if (!currentPartition.hasNext())
{
// The current partition is exhausted. Clean up and advance to the next partition by `continue`.
currentPartition = null; // reset
// Publish any range deletion for the partition
if (rangeDeletionBuilder != null)
{
event = rangeDeletionBuilder.build();
rangeDeletionBuilder = null; // reset
return true;
}
else
{
continue;
}
}
// An unfiltered can either be a Row or RangeTombstoneMarker
Unfiltered unfiltered = currentPartition.next();
if (unfiltered.isRow())
{
Row row = (Row) unfiltered;
event = makeRow(row, currentPartition, trackingId);
return true;
}
else if (unfiltered.isRangeTombstoneMarker())
{
// Range tombstone can get complicated.
// - In the most simple case, that is a DELETE statement with a single clustering key range, we expect
// the UnfilteredRowIterator with 2 markers, i.e. open and close range tombstone markers
// - In a slightly more complicated case, it contains IN operator (on prior clustering keys), we expect
// the UnfilteredRowIterator with 2 * N markers, where N is the number of values specified for IN.
// - In the most complicated case, client could comopse a complex partition update with a BATCH statement.
// It could have those further scenarios: (only discussing the statements applying to the same partition key)
// - Multiple disjoint ranges => we should expect 2 * N markers, where N is the number of ranges.
// - Overlapping ranges with the same timestamp => we should expect 2 markers, considering the
// overlapping ranges are merged into a single one. (as the boundary is omitted)
// - Overlapping ranges with different timestamp ==> we should expect 3 markers, i.e. open bound,
// boundary and end bound
// - Ranges mixed with INSERT! => The order of the unfiltered (i.e. Row/RangeTombstoneMarker) is determined
// by comparing the row clustering with the bounds of the ranges. See o.a.c.d.r.RowAndDeletionMergeIterator
RangeTombstoneMarker rangeTombstoneMarker = (RangeTombstoneMarker) unfiltered;
// We encode the ranges within the same spark row. Therefore, it needs to keep the markers when
// iterating through the partition, and _only_ generate a spark row with range tombstone info when
// exhausting the partition / UnfilteredRowIterator.
handleRangeTombstone(rangeTombstoneMarker, currentPartition, trackingId);
// continue to consume the next unfiltered row/marker
}
else
{
// As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker, see o.a.c.db.rows.Unfiltered.Kind
// Having the else branch only for completeness.
throw new IllegalStateException("Encountered unknown Unfiltered kind.");
}
}
}