in server/src/main/java/org/apache/uniffle/server/merge/BlockFlushFileReader.java [373:415]
private int internalTransferTo(ByteBuf out, int len) {
if (stop) {
throw new RssException("Block flush file reader is closed, caused by " + readThrowable);
}
if (len == 0) {
return 0;
} else if (eof || len < 0) {
throw new IndexOutOfBoundsException();
}
while (ringBuffer.empty() && !stop) {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new RssException(e);
}
}
int c = 0;
while (len > 0) {
Buffer buffer = this.ringBuffer.getReadBuffer();
if (buffer == null) {
break;
}
ByteBuf byteBuf = buffer.getByteBuf();
int toRead = len;
if (len >= byteBuf.readableBytes()) {
this.ringBuffer.incReadIndex();
toRead = byteBuf.readableBytes();
}
len -= toRead;
out.writeBytes(byteBuf, toRead);
pos += toRead;
c += toRead;
}
if (pos >= length) {
eof = true;
}
return c;
}