public List contains()

in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java [343:415]


    public List<Boolean> contains(Partitioner partitioner, String keyspace, String table, SSTable ssTable, List<ByteBuffer> partitionKeys) throws IOException
    {
        if (partitionKeys.isEmpty())
        {
            return Collections.emptyList();
        }

        IPartitioner iPartitioner = getPartitioner(partitioner);
        List<DecoratedKey> decoratedKeys = partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList());
        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable);
        BloomFilter filter = openBloomFilter(descriptor, ssTable);
        List<Boolean> result = decoratedKeys.stream().map(filter::isPresent).collect(Collectors.toList());
        if (result.stream().noneMatch(found -> found))
        {
            // no matches in the bloom filter, so we can exit early
            return result;
        }

        // sorted by token with index into original partitionKeys list
        List<Pair<BigInteger, Integer>> sortedByTokens = IntStream.range(0, decoratedKeys.size())
                                                                  .mapToObj(idx -> {
                                                                      DecoratedKey key = decoratedKeys.get(idx);
                                                                      BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());
                                                                      return Pair.of(token, idx);
                                                                  })
                                                                  .sorted(Comparator.comparing(Pair::getLeft))
                                                                  .collect(Collectors.toList());
        try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
        {
            if (primaryIndex == null)
            {
                throw new IOException("Could not read Index.db file");
            }

            final int[] position = new int[]{0};
            ReaderUtils.readPrimaryIndex(primaryIndex, (buffer) -> {
                DecoratedKey key = iPartitioner.decorateKey(buffer);
                BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());

                Pair<BigInteger, Integer> current = sortedByTokens.get(position[0]);
                int compare = token.compareTo(current.getLeft());
                while (compare > 0)
                {
                    // we passed without finding the key
                    result.set(current.getRight(), false);
                    position[0]++;
                    if (position[0] >= decoratedKeys.size())
                    {
                        // if we've found all the keys we can exit early
                        return true;
                    }
                    current = sortedByTokens.get(position[0]);
                    compare = token.compareTo(current.getLeft());
                }

                ByteBuffer currentKey = partitionKeys.get(current.getRight());
                if (compare == 0 && buffer.equals(currentKey))  // token and key matches
                {
                    result.set(current.getRight(), true);
                    position[0]++;
                }

                // if we've found all the keys we can exit early
                return position[0] >= decoratedKeys.size();
            });

            // mark as false any keys we didn't reach
            IntStream.range(position[0], sortedByTokens.size())
                     .forEach(i -> result.set(sortedByTokens.get(i).getRight(), false));
        }

        return result;
    }