fn poll_read_into()

in dc/s2n-quic-dc/src/stream/recv/application.rs [189:297]


    fn poll_read_into<S>(
        &mut self,
        cx: &mut Context,
        out_buf: &mut buffer::writer::storage::Tracked<S>,
    ) -> Poll<io::Result<()>>
    where
        S: buffer::writer::Storage,
    {
        if let Some(res) = self.local_state.check() {
            return res.into();
        }

        // force a read on the socket if the application gave us an empty buffer
        let mut force_recv = !out_buf.has_remaining_capacity();

        let shared = &self.shared;
        let sockets = &self.sockets;
        let transport_features = sockets.features();

        let mut reader = shared.receiver.application_guard(
            self.ack_mode,
            &mut self.send_buffer,
            shared,
            sockets,
        )?;
        let reader = &mut *reader;

        loop {
            // try to process any bytes we have in the recv buffer
            reader.process_recv_buffer(out_buf, shared, transport_features);

            // if we still have remaining capacity in the `out_buf` make sure the reassembler is
            // fully drained
            if cfg!(debug_assertions) && out_buf.has_remaining_capacity() {
                assert!(reader.reassembler.is_empty());
            }

            // make sure we don't have an error
            if let Err(err) = reader.receiver.check_error() {
                self.local_state
                    .transition(LocalState::Errored(err), &self.shared);
                return Err(err.into()).into();
            }

            match reader.receiver.state() {
                state::Receiver::Recv | state::Receiver::SizeKnown => {
                    // we haven't received everything so we still need to read from the socket
                }
                state::Receiver::DataRecvd => {
                    // make sure we have capacity in the buffer before looping back around
                    ensure!(out_buf.has_remaining_capacity(), Ok(()).into());

                    // if we've received everything from the sender then no need to poll
                    // the socket at this point
                    continue;
                }
                // if we've copied the entire buffer into the application then just return
                state::Receiver::DataRead => {
                    self.local_state
                        .transition(LocalState::Drained, &self.shared);
                    break;
                }
                // we already checked for an error above
                state::Receiver::ResetRecvd | state::Receiver::ResetRead => unreachable!(),
            }

            match self.read_mode {
                // ignore the mode if we have a forced receive
                _ if force_recv => {}
                // if we've completely filled the `out_buf` then we're done
                ReadMode::UntilFull if !out_buf.has_remaining_capacity() => break,
                // if we've read at least one byte then we're done
                ReadMode::Once if out_buf.written_len() > 0 => break,
                // otherwise keep going
                _ => {}
            }

            let recv = reader.poll_fill_recv_buffer(
                cx,
                Actor::Application,
                self.sockets.read_application(),
                &self.shared.clock,
                &self.shared.subscriber,
            );

            let recv_len =
                match Self::handle_socket_result(cx, &mut reader.receiver, &mut self.timer, recv) {
                    Poll::Ready(res) => res?,
                    // if we've written at least one byte then return that amount
                    Poll::Pending if out_buf.written_len() > 0 => break,
                    Poll::Pending => return Poll::Pending,
                };

            // clear the forced receive after performing it once
            force_recv = false;

            if recv_len == 0 {
                if transport_features.is_stream() {
                    // if we got a 0-length read then the stream was closed - notify the receiver
                    reader.receiver.on_transport_close();
                    continue;
                } else {
                    debug_assert!(false, "datagram recv buffers should never be empty");
                }
            }
        }

        Ok(()).into()
    }