in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java [446:495]
public void consume()
{
rid.setColumnNameCopy(ReaderUtils.encodeCellName(metadata,
clustering,
column.name.bytes,
ByteBufferUtil.EMPTY_BYTE_BUFFER));
// The complex data is live, but there could be element deletion inside; check for it later in the block
if (deletionTime.isLive())
{
ComplexTypeBuffer buffer = ComplexTypeBuffer.newBuffer(column.type, cellCount);
long maxTimestamp = Long.MIN_VALUE;
while (cells.hasNext())
{
Cell<?> cell = cells.next();
// Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon,
// it is handled as tombstone
if (cell.isLive(timeProvider.nowInTruncatedSeconds()))
{
buffer.addCell(cell);
}
else
{
// Only adds the tombstoned cell when running as a CDC job
handleCellTombstoneInComplex(cell);
}
// In the case the cell is deleted, the deletion time is also the cell's timestamp
maxTimestamp = Math.max(maxTimestamp, cell.timestamp());
}
// In the case of CDC, consuming the mutation contains cell tombstones
// results into an empty buffer built
if (rid.hasCellTombstoneInComplex())
{
rid.setValueCopy(null);
}
else
{
rid.setValueCopy(buffer.build());
}
rid.setTimestamp(maxTimestamp);
}
else
{
// The entire collection/UDT is deleted
handleCellTombstone();
rid.setTimestamp(deletionTime.markedForDeleteAt());
}
// Null out clustering to indicate no data
clustering = null;
}