in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java [343:415]
public List<Boolean> contains(Partitioner partitioner, String keyspace, String table, SSTable ssTable, List<ByteBuffer> partitionKeys) throws IOException
{
if (partitionKeys.isEmpty())
{
return Collections.emptyList();
}
IPartitioner iPartitioner = getPartitioner(partitioner);
List<DecoratedKey> decoratedKeys = partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList());
Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable);
BloomFilter filter = openBloomFilter(descriptor, ssTable);
List<Boolean> result = decoratedKeys.stream().map(filter::isPresent).collect(Collectors.toList());
if (result.stream().noneMatch(found -> found))
{
// no matches in the bloom filter, so we can exit early
return result;
}
// sorted by token with index into original partitionKeys list
List<Pair<BigInteger, Integer>> sortedByTokens = IntStream.range(0, decoratedKeys.size())
.mapToObj(idx -> {
DecoratedKey key = decoratedKeys.get(idx);
BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());
return Pair.of(token, idx);
})
.sorted(Comparator.comparing(Pair::getLeft))
.collect(Collectors.toList());
try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
{
if (primaryIndex == null)
{
throw new IOException("Could not read Index.db file");
}
final int[] position = new int[]{0};
ReaderUtils.readPrimaryIndex(primaryIndex, (buffer) -> {
DecoratedKey key = iPartitioner.decorateKey(buffer);
BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());
Pair<BigInteger, Integer> current = sortedByTokens.get(position[0]);
int compare = token.compareTo(current.getLeft());
while (compare > 0)
{
// we passed without finding the key
result.set(current.getRight(), false);
position[0]++;
if (position[0] >= decoratedKeys.size())
{
// if we've found all the keys we can exit early
return true;
}
current = sortedByTokens.get(position[0]);
compare = token.compareTo(current.getLeft());
}
ByteBuffer currentKey = partitionKeys.get(current.getRight());
if (compare == 0 && buffer.equals(currentKey)) // token and key matches
{
result.set(current.getRight(), true);
position[0]++;
}
// if we've found all the keys we can exit early
return position[0] >= decoratedKeys.size();
});
// mark as false any keys we didn't reach
IntStream.range(position[0], sortedByTokens.size())
.forEach(i -> result.set(sortedByTokens.get(i).getRight(), false));
}
return result;
}