in netbench/src/duplex.rs [112:154]
fn poll_receive(
&mut self,
_owner: Owner,
_id: u64,
bytes: u64,
cx: &mut Context,
) -> Poll<Result<u64>> {
let mut buf: [MaybeUninit<u8>; READ_BUFFER_SIZE] =
unsafe { MaybeUninit::uninit().assume_init() };
let mut received: u64 = 0;
while received < bytes {
let mut buf = ReadBuf::uninit(&mut buf);
match self.inner.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(_) => {
// we got at least one byte back so loop around and try to get some more
if !buf.filled().is_empty() {
received += buf.filled().len() as u64;
continue;
}
// when we get 0 bytes, it means we don't have any more data so close the
// stream
if self.stream_opened {
self.close_stream()?;
}
break;
}
// we didn't get any data on any iterations so we're pending
Poll::Pending if received == 0 => {
return Poll::Pending;
}
// we got at least one byte previously so return that
Poll::Pending => {
break;
}
}
}
Ok(received).into()
}