in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java [137:223]
static void consumePrimaryIndex(@NotNull IPartitioner partitioner,
@NotNull InputStream primaryIndex,
@NotNull SSTable ssTable,
@Nullable CompressionMetadata compressionMetadata,
@Nullable SparkRangeFilter range,
@NotNull Stats stats,
long skipBytes,
@NotNull IndexConsumer consumer) throws IOException
{
long primaryIndexLength = ssTable.length(FileType.INDEX);
long dataDbFileLength = ssTable.length(FileType.DATA);
try (DataInputStream dis = new DataInputStream(primaryIndex))
{
if (skipBytes > 0)
{
ByteBufferUtils.skipFully(dis, skipBytes);
stats.indexBytesSkipped(skipBytes);
}
ByteBuffer prevKey = null;
long prevPos = 0;
BigInteger prevToken = null;
boolean started = false;
long totalBytesRead = 0;
try
{
while (true)
{
// read partition key length
int len = dis.readUnsignedShort();
// read partition key & decorate
byte[] buf = new byte[len];
dis.readFully(buf);
ByteBuffer key = ByteBuffer.wrap(buf);
DecoratedKey decoratedKey = partitioner.decorateKey(key);
BigInteger token = ReaderUtils.tokenToBigInteger(decoratedKey.getToken());
// read position & skip promoted index
long pos = ReaderUtils.readPosition(dis);
int promotedIndex = ReaderUtils.skipPromotedIndex(dis);
totalBytesRead += 2 + len + VIntCoding.computeUnsignedVIntSize(pos) + promotedIndex;
if (prevKey != null && (range == null || range.overlaps(prevToken)))
{
// previous key overlaps with range filter, so consume
started = true;
long uncompressed = pos - prevPos;
long compressed = compressionMetadata == null
? uncompressed
: calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, pos - 1);
consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed));
}
else if (started)
{
// we have gone passed the range we care about so exit early
stats.indexBytesSkipped(primaryIndexLength - totalBytesRead - skipBytes);
return;
}
prevPos = pos;
prevKey = key;
prevToken = token;
}
}
catch (EOFException ignored)
{
// finished
}
finally
{
stats.indexBytesRead(totalBytesRead);
}
if (prevKey != null && (range == null || range.overlaps(prevToken)))
{
// we reached the end of the file, so consume last key if overlaps
long end = (compressionMetadata == null ? dataDbFileLength : compressionMetadata.getDataLength());
long uncompressed = end - prevPos;
long compressed = compressionMetadata == null
? uncompressed
: calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, end - 1);
consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed));
}
}
}