private boolean getNext()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java [179:297]


    private boolean getNext() throws IOException
    {
        while (scanner.hasNext())
        {
            // If hasNext returns true, it indicates the partition keys has been loaded into the rid.
            // Therefore, let's try to rebuild partition.
            // Deserialize partition keys - if we have moved to a new partition - and update 'values' Object[] array.
            maybeRebuildPartition();

            if (rid.shouldConsumeRangeTombstoneMarkers())
            {
                List<RangeTombstoneMarker> markers = rid.getRangeTombstoneMarkers();
                long maxTimestamp = markers.stream()
                                           .map(marker -> {
                                               if (marker.isBoundary())
                                               {
                                                   return Math.max(marker.openDeletionTime(false), marker.closeDeletionTime(false));
                                               }
                                               else
                                               {
                                                   return marker.isOpen(false) ? marker.openDeletionTime(false) : marker.closeDeletionTime(false);
                                               }
                                           })
                                           .max(Long::compareTo)
                                           .get();  // Safe to call get as markers is non-empty
                // Range tombstones requires only to have the partition key in the spark row,
                // the range tombstones are encoded in the extra column
                int partitionkeyLength = cqlTable.numPartitionKeys();
                next = new RangeTombstone(ArrayUtils.retain(values, 0, partitionkeyLength), maxTimestamp, markers);
                rid.resetRangeTombstoneMarkers();
                return true;
            }

            if (rid.isPartitionDeletion())
            {
                // Special case that row deletion will only have the partition key parts present in the values array
                int partitionkeyLength = cqlTable.numPartitionKeys();
                // Strip out other values if any rather than the partition keys
                next = new Tombstone(ArrayUtils.retain(values, 0, partitionkeyLength), rid.getTimestamp());
                rid.setPartitionDeletion(false);  // Reset
                return true;
            }

            scanner.advanceToNextColumn();

            // Skip partition e.g. if token is outside of Spark worker token range
            if (skipPartition)
            {
                continue;
            }

            // Deserialize clustering keys - if moved to new CQL row - and update 'values' Object[] array
            ByteBuffer columnNameBuf = Objects.requireNonNull(rid.getColumnName(), "ColumnName buffer in Rid is null, this is unexpected");
            maybeRebuildClusteringKeys(columnNameBuf);

            // Deserialize CQL field column name
            ByteBuffer component = ColumnTypes.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
            String columnName = component != null ? ByteBufferUtils.stringThrowRuntime(component) : null;
            if (columnName == null || columnName.length() == 0)
            {
                if (noValueColumns)
                {
                    // Special case where schema consists only of partition keys, clustering keys or static columns, no value columns
                    next = new Cell(values, 0, newRow, rid.isUpdate(), rid.getTimestamp());
                    return true;
                }

                // SBR job (not CDC) should not expect encountering a row tombstone.
                // It would throw IllegalStateException at the beginning of this method (at scanner.hasNext()).
                // For a row deletion, the resulting row tombstone does not carry other fields than the primary keys.
                if (rid.isRowDeletion())
                {
                    // Special case that row deletion will only have the primary key parts present in the values array
                    int primaryKeyLength = cqlTable.numPrimaryKeyColumns();
                    // Strip out other values if any rather than the primary keys
                    next = new Tombstone(ArrayUtils.retain(values, 0, primaryKeyLength), rid.getTimestamp());
                    // Reset row deletion flag
                    rid.setRowDeletion(false);
                    return true;
                }

                continue;
            }

            CqlField field = cqlTable.getField(columnName);
            if (field == null)
            {
                LOGGER.warn("Ignoring unknown column columnName='{}'", columnName);
                continue;
            }

            // Deserialize value field or static column and update 'values' Object[] array
            deserializeField(field);

            // Static column, so continue reading entire CQL row before returning
            if (field.isStaticColumn())
            {
                continue;
            }

            if (rid.hasCellTombstoneInComplex())
            {
                next = new TombstonesInComplex(values, field.position(), newRow, rid.getTimestamp(), columnName, rid.getCellTombstonesInComplex());
                rid.resetCellTombstonesInComplex();
            }
            else
            {
                // Update next Cell
                next = new Cell(values, field.position(), newRow, rid.isUpdate(), rid.getTimestamp());
            }

            return true;
        }

        // Finished so close
        next = null;
        close();
        return false;
    }