in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java [179:297]
private boolean getNext() throws IOException
{
while (scanner.hasNext())
{
// If hasNext returns true, it indicates the partition keys has been loaded into the rid.
// Therefore, let's try to rebuild partition.
// Deserialize partition keys - if we have moved to a new partition - and update 'values' Object[] array.
maybeRebuildPartition();
if (rid.shouldConsumeRangeTombstoneMarkers())
{
List<RangeTombstoneMarker> markers = rid.getRangeTombstoneMarkers();
long maxTimestamp = markers.stream()
.map(marker -> {
if (marker.isBoundary())
{
return Math.max(marker.openDeletionTime(false), marker.closeDeletionTime(false));
}
else
{
return marker.isOpen(false) ? marker.openDeletionTime(false) : marker.closeDeletionTime(false);
}
})
.max(Long::compareTo)
.get(); // Safe to call get as markers is non-empty
// Range tombstones requires only to have the partition key in the spark row,
// the range tombstones are encoded in the extra column
int partitionkeyLength = cqlTable.numPartitionKeys();
next = new RangeTombstone(ArrayUtils.retain(values, 0, partitionkeyLength), maxTimestamp, markers);
rid.resetRangeTombstoneMarkers();
return true;
}
if (rid.isPartitionDeletion())
{
// Special case that row deletion will only have the partition key parts present in the values array
int partitionkeyLength = cqlTable.numPartitionKeys();
// Strip out other values if any rather than the partition keys
next = new Tombstone(ArrayUtils.retain(values, 0, partitionkeyLength), rid.getTimestamp());
rid.setPartitionDeletion(false); // Reset
return true;
}
scanner.advanceToNextColumn();
// Skip partition e.g. if token is outside of Spark worker token range
if (skipPartition)
{
continue;
}
// Deserialize clustering keys - if moved to new CQL row - and update 'values' Object[] array
ByteBuffer columnNameBuf = Objects.requireNonNull(rid.getColumnName(), "ColumnName buffer in Rid is null, this is unexpected");
maybeRebuildClusteringKeys(columnNameBuf);
// Deserialize CQL field column name
ByteBuffer component = ColumnTypes.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
String columnName = component != null ? ByteBufferUtils.stringThrowRuntime(component) : null;
if (columnName == null || columnName.length() == 0)
{
if (noValueColumns)
{
// Special case where schema consists only of partition keys, clustering keys or static columns, no value columns
next = new Cell(values, 0, newRow, rid.isUpdate(), rid.getTimestamp());
return true;
}
// SBR job (not CDC) should not expect encountering a row tombstone.
// It would throw IllegalStateException at the beginning of this method (at scanner.hasNext()).
// For a row deletion, the resulting row tombstone does not carry other fields than the primary keys.
if (rid.isRowDeletion())
{
// Special case that row deletion will only have the primary key parts present in the values array
int primaryKeyLength = cqlTable.numPrimaryKeyColumns();
// Strip out other values if any rather than the primary keys
next = new Tombstone(ArrayUtils.retain(values, 0, primaryKeyLength), rid.getTimestamp());
// Reset row deletion flag
rid.setRowDeletion(false);
return true;
}
continue;
}
CqlField field = cqlTable.getField(columnName);
if (field == null)
{
LOGGER.warn("Ignoring unknown column columnName='{}'", columnName);
continue;
}
// Deserialize value field or static column and update 'values' Object[] array
deserializeField(field);
// Static column, so continue reading entire CQL row before returning
if (field.isStaticColumn())
{
continue;
}
if (rid.hasCellTombstoneInComplex())
{
next = new TombstonesInComplex(values, field.position(), newRow, rid.getTimestamp(), columnName, rid.getCellTombstonesInComplex());
rid.resetCellTombstonesInComplex();
}
else
{
// Update next Cell
next = new Cell(values, field.position(), newRow, rid.isUpdate(), rid.getTimestamp());
}
return true;
}
// Finished so close
next = null;
close();
return false;
}