in dc/s2n-quic-dc/src/stream/recv/application.rs [189:297]
fn poll_read_into<S>(
&mut self,
cx: &mut Context,
out_buf: &mut buffer::writer::storage::Tracked<S>,
) -> Poll<io::Result<()>>
where
S: buffer::writer::Storage,
{
if let Some(res) = self.local_state.check() {
return res.into();
}
// force a read on the socket if the application gave us an empty buffer
let mut force_recv = !out_buf.has_remaining_capacity();
let shared = &self.shared;
let sockets = &self.sockets;
let transport_features = sockets.features();
let mut reader = shared.receiver.application_guard(
self.ack_mode,
&mut self.send_buffer,
shared,
sockets,
)?;
let reader = &mut *reader;
loop {
// try to process any bytes we have in the recv buffer
reader.process_recv_buffer(out_buf, shared, transport_features);
// if we still have remaining capacity in the `out_buf` make sure the reassembler is
// fully drained
if cfg!(debug_assertions) && out_buf.has_remaining_capacity() {
assert!(reader.reassembler.is_empty());
}
// make sure we don't have an error
if let Err(err) = reader.receiver.check_error() {
self.local_state
.transition(LocalState::Errored(err), &self.shared);
return Err(err.into()).into();
}
match reader.receiver.state() {
state::Receiver::Recv | state::Receiver::SizeKnown => {
// we haven't received everything so we still need to read from the socket
}
state::Receiver::DataRecvd => {
// make sure we have capacity in the buffer before looping back around
ensure!(out_buf.has_remaining_capacity(), Ok(()).into());
// if we've received everything from the sender then no need to poll
// the socket at this point
continue;
}
// if we've copied the entire buffer into the application then just return
state::Receiver::DataRead => {
self.local_state
.transition(LocalState::Drained, &self.shared);
break;
}
// we already checked for an error above
state::Receiver::ResetRecvd | state::Receiver::ResetRead => unreachable!(),
}
match self.read_mode {
// ignore the mode if we have a forced receive
_ if force_recv => {}
// if we've completely filled the `out_buf` then we're done
ReadMode::UntilFull if !out_buf.has_remaining_capacity() => break,
// if we've read at least one byte then we're done
ReadMode::Once if out_buf.written_len() > 0 => break,
// otherwise keep going
_ => {}
}
let recv = reader.poll_fill_recv_buffer(
cx,
Actor::Application,
self.sockets.read_application(),
&self.shared.clock,
&self.shared.subscriber,
);
let recv_len =
match Self::handle_socket_result(cx, &mut reader.receiver, &mut self.timer, recv) {
Poll::Ready(res) => res?,
// if we've written at least one byte then return that amount
Poll::Pending if out_buf.written_len() > 0 => break,
Poll::Pending => return Poll::Pending,
};
// clear the forced receive after performing it once
force_recv = false;
if recv_len == 0 {
if transport_features.is_stream() {
// if we got a 0-length read then the stream was closed - notify the receiver
reader.receiver.on_transport_close();
continue;
} else {
debug_assert!(false, "datagram recv buffers should never be empty");
}
}
}
Ok(()).into()
}