public IndexReader()

in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java [57:134]


    public IndexReader(@NotNull SSTable ssTable,
                       @NotNull TableMetadata metadata,
                       @Nullable SparkRangeFilter rangeFilter,
                       @NotNull Stats stats,
                       @NotNull IndexConsumer consumer)
    {
        long now = System.nanoTime();
        long startTimeNanos = now;
        try
        {
            File file = ReaderUtils.constructFilename(metadata.keyspace, metadata.name, ssTable.getDataFileName());
            Descriptor descriptor = Descriptor.fromFilename(file);
            Version version = descriptor.version;

            // if there is a range filter we can use the Summary.db file to seek to approximate start token range location in Index.db file
            long skipAhead = -1;
            now = System.nanoTime();
            if (rangeFilter != null)
            {
                SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
                if (summary != null)
                {
                    this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()),
                                                          ReaderUtils.tokenToBigInteger(summary.last().getToken()));
                    if (!rangeFilter.overlaps(this.ssTableRange))
                    {
                        LOGGER.info("Skipping non-overlapping Index.db file rangeFilter='[{},{}]' sstableRange='[{},{}]'",
                                    rangeFilter.tokenRange().firstEnclosedValue(), rangeFilter.tokenRange().upperEndpoint(),
                                    this.ssTableRange.firstEnclosedValue(), this.ssTableRange.upperEndpoint());
                        stats.indexFileSkipped();
                        return;
                    }

                    skipAhead = summary.summary().getPosition(
                    SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
                    );
                    stats.indexSummaryFileRead(System.nanoTime() - now);
                    now = System.nanoTime();
                }
            }

            // read CompressionMetadata if it exists
            CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata(ssTable, version.hasMaxCompressedLength());
            if (compressionMetadata != null)
            {
                stats.indexCompressionFileRead(System.nanoTime() - now);
                now = System.nanoTime();
            }

            // read through Index.db and consume Partition keys
            try (InputStream is = ssTable.openPrimaryIndexStream())
            {
                if (is == null)
                {
                    consumer.onFailure(new IncompleteSSTableException(FileType.INDEX));
                    return;
                }

                consumePrimaryIndex(metadata.partitioner,
                                    is,
                                    ssTable,
                                    compressionMetadata,
                                    rangeFilter,
                                    stats,
                                    skipAhead,
                                    consumer);
                stats.indexFileRead(System.nanoTime() - now);
            }
        }
        catch (Throwable t)
        {
            consumer.onFailure(t);
        }
        finally
        {
            consumer.onFinished(System.nanoTime() - startTimeNanos);
        }
    }