public boolean hasNext()

in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java [133:299]


    public boolean hasNext() throws IOException
    {
        if (allPartitions == null)
        {
            allPartitions = initializePartitions();
        }

        while (true)
        {
            if (partition == null)
            {
                try
                {
                    // We've exhausted the partition iterator
                    if (allPartitions.hasNext())
                    {
                        // Advance to next partition
                        partition = allPartitions.next();

                        if (partition.partitionLevelDeletion().isLive())
                        {
                            // Reset rid with new partition key
                            rid.setPartitionKeyCopy(partition.partitionKey().getKey(),
                                                    ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken()));
                        }
                        else
                        {
                            // There's a partition-level delete
                            handlePartitionTombstone(partition);
                            return true;
                        }
                    }
                    else
                    {
                        return false;
                    }
                }
                catch (SSTableStreamException exception)
                {
                    throw exception.getIOException();
                }

                // If the partition has a non-empty static row, grab its columns,
                // so we process those before moving onto its atoms (the Unfiltered instances)
                staticRow = partition.staticRow();
                if (!staticRow.isEmpty())
                {
                    columns = staticRow.iterator();
                    prepareColumnData();
                    return true;
                }
            }

            // We may be in the midst of processing some multi-cell column data,
            // if so, we'll resume that where we left off
            if (columnData != null && columnData.hasData())
            {
                return true;
            }

            // Continue to process columns of the last read row, which may be static
            if (columns != null && columns.hasNext())
            {
                prepareColumnData();
                return true;
            }

            // Current row was exhausted (or none were present), so move to the next atom
            columns = null;
            try
            {
                // Advance to next unfiltered
                rid.setIsUpdate(false);  // Reset isUpdate flag
                if (partition.hasNext())
                {
                    unfiltered = partition.next();
                }
                else
                {
                    // Current partition is exhausted
                    partition = null;
                    unfiltered = null;

                    // Produce a spark row if there are range tombstone markers
                    if (rid.hasRangeTombstoneMarkers())
                    {
                        // The current partition is exhusted and ready to produce a spark row for the range tombstones
                        rid.setShouldConsumeRangeTombstoneMarkers(true);
                        return true;
                    }
                }
            }
            catch (SSTableStreamException exception)
            {
                throw exception.getIOException();
            }

            if (unfiltered != null)
            {
                if (unfiltered.isRow())
                {
                    Row row = (Row) unfiltered;

                    // There is a CQL row level delete
                    if (!row.deletion().isLive())
                    {
                        handleRowTombstone(row);
                        return true;
                    }

                    // For non-compact tables, set up a ClusteringColumnDataState to emit a Rid that emulates a
                    // pre-3.0 CQL row marker. This is necessary for backwards compatibility with 2.1 & 2.0 output,
                    // and also for tables with only primary key columns defined.
                    // An empty PKLI is the 3.0 equivalent of having no row marker (e.g. row modifications via
                    // UPDATE not INSERT) so we don't emit a fake row marker in that case.
                    boolean emptyLiveness = row.primaryKeyLivenessInfo().isEmpty();
                    rid.setIsUpdate(emptyLiveness);
                    if (!emptyLiveness)
                    {
                        if (TableMetadata.Flag.isCQLTable(metadata.flags))
                        {
                            columnData = new ClusteringColumnDataState(row.clustering());
                        }
                        columns = row.iterator();
                        return true;
                    }

                    // The row's actual columns may be empty, in which case we'll simply skip over them during the next
                    // iteration and move to the next unfiltered. So then only the row marker and/or row deletion (if
                    // either are present) will get emitted
                    columns = row.iterator();
                }
                else if (unfiltered.isRangeTombstoneMarker())
                {
                    // Range tombstone can get complicated:
                    // - In the most simple case, that is a DELETE statement with a single clustering key range, we
                    //   expect the UnfilteredRowIterator with 2 markers, i.e. open and close range tombstone markers
                    // - In a slightly more complicated case, it contains IN operator (on prior clustering keys), we
                    //   expect the UnfilteredRowIterator with 2 * N markers, where N is the number of values specified
                    //   for IN
                    // - In the most complicated case, client could comopse a complex partition update with a BATCH
                    //   statement; it could have those further scenarios: (only discussing the statements applying to
                    //   the same partition key)
                    //   - Multiple disjoint ranges => we should expect 2 * N markers, where N is the number of ranges
                    //   - Overlapping ranges with the same timestamp => we should expect 2 markers, considering the
                    //     overlapping ranges are merged into a single one. (as the boundary is omitted)
                    //   - Overlapping ranges with different timestamp ==> we should expect 3 markers, i.e. open bound,
                    //     boundary and end bound
                    //   - Ranges mixed with INSERT! => The order of the unfiltered (i.e. Row/RangeTombstoneMarker) is
                    //     determined by comparing the row clustering with the bounds of the ranges.
                    //     See o.a.c.d.r.RowAndDeletionMergeIterator
                    RangeTombstoneMarker rangeTombstoneMarker = (RangeTombstoneMarker) unfiltered;
                    // We encode the ranges within the same spark row. Therefore, it needs to keep the markers when
                    // iterating through the partition, and _only_ generate a spark row with range tombstone info when
                    // exhausting the partition / UnfilteredRowIterator.
                    handleRangeTombstone(rangeTombstoneMarker);
                    // Continue to consume the next unfiltered row/marker
                }
                else
                {
                    // As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker,
                    // see o.a.c.db.rows.Unfiltered.Kind; having the else branch only for completeness
                    throw new IllegalStateException("Encountered unknown Unfiltered kind");
                }
            }
        }
    }