in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/ByteInputContext.java [188:220]
public int read(final byte[] bytes, final int baseOffset, final int maxLength) throws IOException {
if (bytes == null) {
throw new NullPointerException();
}
if (baseOffset < 0 || maxLength < 0 || maxLength > bytes.length - baseOffset) {
throw new IndexOutOfBoundsException();
}
try {
// the number of bytes that has been read so far
int readBytes = 0;
// the number of bytes to read
int capacity = maxLength;
while (capacity > 0) {
final ByteBuf head = byteBufQueue.peek();
if (head == null) {
// end of stream event
return readBytes == 0 ? -1 : readBytes;
}
final int toRead = Math.min(head.readableBytes(), capacity);
head.readBytes(bytes, baseOffset + readBytes, toRead);
if (head.readableBytes() == 0) {
byteBufQueue.take();
head.release();
}
readBytes += toRead;
capacity -= toRead;
}
return readBytes;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}