in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java [317:359]
private void maybeRebuildPartition()
{
if (!rid.isNewPartition())
{
return;
}
// Skip partitions not in the token range for this Spark partition
newRow = true;
for (CqlField field : cqlTable.staticColumns())
{
// We need to reset static columns between partitions, if a static column is null/not-populated
// in the next partition, then the previous value might be carried across
values[field.position()] = null;
}
skipPartition = !dataLayer.isInPartition(partitionId, rid.getToken(), rid.getPartitionKey());
if (skipPartition)
{
stats.skippedPartitionInIterator(rid.getPartitionKey(), rid.getToken());
return;
}
// Or new partition, so deserialize partition keys and update 'values' array
ByteBuffer partitionKey = rid.getPartitionKey();
if (numPartitionKeys == 1)
{
// Not a composite partition key
CqlField field = cqlTable.partitionKeys().get(0);
values[field.position()] = deserialize(field, partitionKey);
}
else
{
// Split composite partition keys
ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, numPartitionKeys);
int index = 0;
for (CqlField field : cqlTable.partitionKeys())
{
values[field.position()] = deserialize(field, partitionKeyBufs[index++]);
}
}
}