in cassandra-four-zero/src/main/java/org/apache/cassandra/io/util/CdcRandomAccessReader.java [94:142]
public BufferHolder rebuffer(long position)
{
offset = position;
buffer.clear();
int length = bufferSize();
if (length < 0)
{
throw new IllegalStateException(String.format("Read passed maxOffset offset=%d maxOffset=%d",
offset, log.maxOffset()));
}
if (buffer.capacity() != length)
{
// The buffer size will always be chunkSize or CdcRandomAccessReader.DEFAULT_BUFFER_SIZE until we reach the end
buffer = ByteBuffer.allocate(length);
}
long currentPos = inputStream.bytesRead();
try
{
if (offset < currentPos)
{
// Attempting to read bytes previously read. In practice, we read the CommitLogs sequentially,
// but we still need to respect random access reader API, it will just require blocking.
int requestLength = buffer.remaining() + 1;
long end = offset + requestLength;
BlockingStreamConsumer streamConsumer = new BlockingStreamConsumer();
source.request(offset, end, streamConsumer);
streamConsumer.getBytes(buffer);
buffer.flip();
return this;
}
if (offset > currentPos)
{
// Skip ahead
ByteBufferUtils.skipFully(inputStream, offset - currentPos);
}
inputStream.read(buffer);
assert buffer.remaining() == 0;
buffer.flip();
}
catch (IOException exception)
{
throw new RuntimeException(ThrowableUtils.rootCause(exception));
}
return this;
}