in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java [57:134]
public IndexReader(@NotNull SSTable ssTable,
@NotNull TableMetadata metadata,
@Nullable SparkRangeFilter rangeFilter,
@NotNull Stats stats,
@NotNull IndexConsumer consumer)
{
long now = System.nanoTime();
long startTimeNanos = now;
try
{
File file = ReaderUtils.constructFilename(metadata.keyspace, metadata.name, ssTable.getDataFileName());
Descriptor descriptor = Descriptor.fromFilename(file);
Version version = descriptor.version;
// if there is a range filter we can use the Summary.db file to seek to approximate start token range location in Index.db file
long skipAhead = -1;
now = System.nanoTime();
if (rangeFilter != null)
{
SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
if (summary != null)
{
this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()),
ReaderUtils.tokenToBigInteger(summary.last().getToken()));
if (!rangeFilter.overlaps(this.ssTableRange))
{
LOGGER.info("Skipping non-overlapping Index.db file rangeFilter='[{},{}]' sstableRange='[{},{}]'",
rangeFilter.tokenRange().firstEnclosedValue(), rangeFilter.tokenRange().upperEndpoint(),
this.ssTableRange.firstEnclosedValue(), this.ssTableRange.upperEndpoint());
stats.indexFileSkipped();
return;
}
skipAhead = summary.summary().getPosition(
SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
);
stats.indexSummaryFileRead(System.nanoTime() - now);
now = System.nanoTime();
}
}
// read CompressionMetadata if it exists
CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata(ssTable, version.hasMaxCompressedLength());
if (compressionMetadata != null)
{
stats.indexCompressionFileRead(System.nanoTime() - now);
now = System.nanoTime();
}
// read through Index.db and consume Partition keys
try (InputStream is = ssTable.openPrimaryIndexStream())
{
if (is == null)
{
consumer.onFailure(new IncompleteSSTableException(FileType.INDEX));
return;
}
consumePrimaryIndex(metadata.partitioner,
is,
ssTable,
compressionMetadata,
rangeFilter,
stats,
skipAhead,
consumer);
stats.indexFileRead(System.nanoTime() - now);
}
}
catch (Throwable t)
{
consumer.onFailure(t);
}
finally
{
consumer.onFinished(System.nanoTime() - startTimeNanos);
}
}