static void consumePrimaryIndex()

in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java [137:223]


    static void consumePrimaryIndex(@NotNull IPartitioner partitioner,
                                    @NotNull InputStream primaryIndex,
                                    @NotNull SSTable ssTable,
                                    @Nullable CompressionMetadata compressionMetadata,
                                    @Nullable SparkRangeFilter range,
                                    @NotNull Stats stats,
                                    long skipBytes,
                                    @NotNull IndexConsumer consumer) throws IOException
    {
        long primaryIndexLength = ssTable.length(FileType.INDEX);
        long dataDbFileLength = ssTable.length(FileType.DATA);
        try (DataInputStream dis = new DataInputStream(primaryIndex))
        {
            if (skipBytes > 0)
            {
                ByteBufferUtils.skipFully(dis, skipBytes);
                stats.indexBytesSkipped(skipBytes);
            }

            ByteBuffer prevKey = null;
            long prevPos = 0;
            BigInteger prevToken = null;
            boolean started = false;

            long totalBytesRead = 0;
            try
            {
                while (true)
                {
                    // read partition key length
                    int len = dis.readUnsignedShort();

                    // read partition key & decorate
                    byte[] buf = new byte[len];
                    dis.readFully(buf);
                    ByteBuffer key = ByteBuffer.wrap(buf);
                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
                    BigInteger token = ReaderUtils.tokenToBigInteger(decoratedKey.getToken());

                    // read position & skip promoted index
                    long pos = ReaderUtils.readPosition(dis);
                    int promotedIndex = ReaderUtils.skipPromotedIndex(dis);
                    totalBytesRead += 2 + len + VIntCoding.computeUnsignedVIntSize(pos) + promotedIndex;

                    if (prevKey != null && (range == null || range.overlaps(prevToken)))
                    {
                        // previous key overlaps with range filter, so consume
                        started = true;
                        long uncompressed = pos - prevPos;
                        long compressed = compressionMetadata == null
                                                ? uncompressed
                                                : calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, pos - 1);
                        consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed));
                    }
                    else if (started)
                    {
                        // we have gone passed the range we care about so exit early
                        stats.indexBytesSkipped(primaryIndexLength - totalBytesRead - skipBytes);
                        return;
                    }

                    prevPos = pos;
                    prevKey = key;
                    prevToken = token;
                }
            }
            catch (EOFException ignored)
            {
                // finished
            }
            finally
            {
                stats.indexBytesRead(totalBytesRead);
            }

            if (prevKey != null && (range == null || range.overlaps(prevToken)))
            {
                // we reached the end of the file, so consume last key if overlaps
                long end = (compressionMetadata == null ? dataDbFileLength : compressionMetadata.getDataLength());
                long uncompressed = end - prevPos;
                long compressed = compressionMetadata == null
                                        ? uncompressed
                                        : calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, end - 1);
                consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed));
            }
        }
    }