in ktor-io/jvm/src/io/ktor/utils/io/ByteChannelSequentialJVM.kt [133:151]
override suspend fun consumeEachBufferRange(visitor: ConsumeEachBufferVisitor) {
val readable = readable
var invokedWithLast = false
while (true) {
readable.readDirect(1) { bb: ByteBuffer ->
val last = closed && bb.remaining() == availableForRead
visitor(bb, last)
if (last) {
invokedWithLast = true
}
}
if (!await(1)) break
}
if (!invokedWithLast) {
visitor(ByteBuffer.allocate(0), true)
}
}