in cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java [293:342]
public long skip(long count) throws IOException
{
if (count <= 0)
{
return 0;
}
else if (count <= bytesBuffered())
{
long actual = super.skip(count);
stats.inputStreamBytesSkipped(source, actual, 0);
return actual;
}
skipping = true;
long remaining = count;
while (activeRequest || !queue.isEmpty())
{
// Drain any buffered bytes and block until active request completes and the queue is empty
remaining -= super.skip(remaining);
if (remaining <= 0)
{
break;
}
}
// Increment range start pointer to efficiently skip without reading bytes across the network unnecessarily
if (remaining > 0)
{
rangeStart += remaining;
bytesWritten.addAndGet(remaining);
bytesRead += remaining;
}
// Remove skip marker and resume requesting bytes
skipping = false;
switch (state)
{
case Reading:
case NextBuffer:
// Stream is active so request more bytes if queue is not full
maybeRequestMore();
break;
default:
// If skip() is called before calling read() the Stream will be in StreamState.Init,
// in this case we need to initialize the stream before request more bytes
checkState();
}
stats.inputStreamBytesSkipped(source, count - remaining, remaining);
return count;
}