public boolean next()

in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/scanner/CdcSortedStreamScanner.java [109:202]


    public boolean next()
    {
        while (true)
        {
            if (allExhausted())
            {
                return false;
            }
            // sampling CDC events for tracking
            String trackingId = null;
            if (random.nextDouble() < samplingRate())
            {
                trackingId = UUID.randomUUID().toString();
            }

            if (currentPartition == null)
            {
                currentPartition = partitionIterator.next();

                // it is a Cassandra partition deletion
                if (!currentPartition.partitionLevelDeletion().isLive())
                {
                    event = makePartitionTombstone(currentPartition, trackingId);
                    currentPartition = null;
                    return true;
                }

                // the partition contains no other rows but only a static row
                Row staticRow = currentPartition.staticRow();
                if (!currentPartition.hasNext() && staticRow != null && staticRow != Rows.EMPTY_STATIC_ROW)
                {
                    event = makeStaticRow(staticRow, currentPartition, trackingId);
                    currentPartition = null; // reset
                    return true;
                }
            }

            if (!currentPartition.hasNext())
            {
                // The current partition is exhausted. Clean up and advance to the next partition by `continue`.
                currentPartition = null; // reset
                // Publish any range deletion for the partition
                if (rangeDeletionBuilder != null)
                {
                    event = rangeDeletionBuilder.build();
                    rangeDeletionBuilder = null; // reset
                    return true;
                }
                else
                {
                    continue;
                }
            }

            // An unfiltered can either be a Row or RangeTombstoneMarker
            Unfiltered unfiltered = currentPartition.next();

            if (unfiltered.isRow())
            {
                Row row = (Row) unfiltered;
                event = makeRow(row, currentPartition, trackingId);
                return true;
            }
            else if (unfiltered.isRangeTombstoneMarker())
            {
                // Range tombstone can get complicated.
                // - In the most simple case, that is a DELETE statement with a single clustering key range, we expect
                //   the UnfilteredRowIterator with 2 markers, i.e. open and close range tombstone markers
                // - In a slightly more complicated case, it contains IN operator (on prior clustering keys), we expect
                //   the UnfilteredRowIterator with 2 * N markers, where N is the number of values specified for IN.
                // - In the most complicated case, client could comopse a complex partition update with a BATCH statement.
                //   It could have those further scenarios: (only discussing the statements applying to the same partition key)
                //   - Multiple disjoint ranges => we should expect 2 * N markers, where N is the number of ranges.
                //   - Overlapping ranges with the same timestamp => we should expect 2 markers, considering the
                //     overlapping ranges are merged into a single one. (as the boundary is omitted)
                //   - Overlapping ranges with different timestamp ==> we should expect 3 markers, i.e. open bound,
                //     boundary and end bound
                //   - Ranges mixed with INSERT! => The order of the unfiltered (i.e. Row/RangeTombstoneMarker) is determined
                //     by comparing the row clustering with the bounds of the ranges. See o.a.c.d.r.RowAndDeletionMergeIterator
                RangeTombstoneMarker rangeTombstoneMarker = (RangeTombstoneMarker) unfiltered;
                // We encode the ranges within the same spark row. Therefore, it needs to keep the markers when
                // iterating through the partition, and _only_ generate a spark row with range tombstone info when
                // exhausting the partition / UnfilteredRowIterator.
                handleRangeTombstone(rangeTombstoneMarker, currentPartition, trackingId);
                // continue to consume the next unfiltered row/marker
            }
            else
            {
                // As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker, see o.a.c.db.rows.Unfiltered.Kind
                // Having the else branch only for completeness.
                throw new IllegalStateException("Encountered unknown Unfiltered kind.");
            }
        }
    }