in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java [684:736]
public boolean hasNext()
{
try
{
while (true)
{
key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
staticRow = iterator.readStaticRow();
BigInteger token = ReaderUtils.tokenToBigInteger(key.getToken());
if (overlaps(key, token))
{
// Partition overlaps with filters
long now = System.nanoTime();
stats.nextPartition(now - lastTimeNanos);
lastTimeNanos = now;
return true;
}
if (lastToken != null && startOffset != null && lastToken.compareTo(token) < 0)
{
// Partition no longer overlaps SparkTokenRange so we've finished reading this SSTable
stats.skippedDataDbEndOffset(dataStream.position() - startOffset);
return false;
}
stats.skippedPartition(key.getKey(), ReaderUtils.tokenToBigInteger(key.getToken()));
// Skip partition efficiently without deserializing
UnfilteredDeserializer deserializer = UnfilteredDeserializer.create(metadata, in, header, helper);
while (deserializer.hasNext())
{
deserializer.skipNext();
}
}
}
catch (EOFException exception)
{
return false;
}
catch (IOException exception)
{
stats.corruptSSTable(exception, metadata.keyspace, metadata.name, ssTable);
LOGGER.warn("IOException reading sstable keyspace={} table={} dataFileName={} ssTable='{}'",
metadata.keyspace, metadata.name, ssTable.getDataFileName(), ssTable, exception);
throw new SSTableStreamException(exception);
}
catch (Throwable throwable)
{
stats.corruptSSTable(throwable, metadata.keyspace, metadata.name, ssTable);
LOGGER.error("Error reading sstable keyspace={} table={} dataFileName={} ssTable='{}'",
metadata.keyspace, metadata.name, ssTable.getDataFileName(), ssTable, throwable);
throw new RuntimeException(ThrowableUtils.rootCause(throwable));
}
}