in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java [418:476]
public void readPartitionKeys(Partitioner partitioner,
String keyspace,
String createStmt,
SSTablesSupplier ssTables,
@Nullable TokenRange tokenRange,
@Nullable List<ByteBuffer> partitionKeys,
@Nullable String[] requiredColumns,
Consumer<Map<String, Object>> rowConsumer) throws IOException
{
IPartitioner iPartitioner = getPartitioner(partitioner);
SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspace, ReplicationFactor.simpleStrategy(1), partitioner);
TableMetadata metadata = schemaBuilder.tableMetaData();
CqlTable table = schemaBuilder.build();
List<BigInteger> tokens = partitionKeys == null ? Collections.emptyList() : toTokens(partitioner, partitionKeys);
List<PartitionKeyFilter> partitionKeyFilters = partitionKeys == null ? Collections.emptyList() :
IntStream
.range(0, partitionKeys.size())
.mapToObj(i -> PartitionKeyFilter.create(partitionKeys.get(i), tokens.get(i)))
.sorted()
.collect(Collectors.toList());
try (CellIterator it = new CellIterator(0,
table,
Stats.DoNothingStats.INSTANCE,
TypeConverter.IDENTITY,
partitionKeyFilters,
(t) -> PruneColumnFilter.of(requiredColumns),
(partitionId1, partitionKeyFilters1, columnFilter1) ->
new CompactionStreamScanner(
metadata,
partitioner,
TimeProvider.DEFAULT,
ssTables.openAll((ssTable, isRepairPrimary) ->
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withPartitionKeyFilters(partitionKeyFilters1)
.build())
))
{
@Override
public boolean isInPartition(int partitionId, BigInteger token, ByteBuffer partitionKey)
{
return true;
}
@Override
public boolean equals(CqlField field, Object obj1, Object obj2)
{
return Objects.equals(obj1, obj2);
}
})
{
RowIterator<Map<String, Object>> rowIterator = RowIterator.rowMapIterator(it, Stats.DoNothingStats.INSTANCE, requiredColumns);
while (rowIterator.next())
{
rowConsumer.accept(rowIterator.get());
}
}
}