fn dispatch_buffer_stream()

in dc/s2n-quic-dc/src/stream/recv/buffer/local.rs [131:214]


    fn dispatch_buffer_stream<R>(&mut self, router: &mut R) -> Result<(), recv::Error>
    where
        R: Dispatch,
    {
        let msg = &mut self.recv_buffer;
        let remote_addr = msg.remote_address();
        let ecn = msg.ecn();
        let tag_len = router.tag_len();

        let mut prev_packet_len = None;

        loop {
            // consume the previous packet
            if let Some(packet_len) = prev_packet_len.take() {
                msg.consume(packet_len);
            }

            let segment = msg.peek();
            ensure!(!segment.is_empty(), break);

            let initial_len = segment.len();
            let decoder = DecoderBufferMut::new(segment);

            let packet = match decoder.decode_parameterized(tag_len) {
                Ok((packet, remaining)) => {
                    prev_packet_len = Some(initial_len - remaining.len());
                    packet
                }
                Err(decoder_error) => {
                    if let DecoderError::UnexpectedEof(len) = decoder_error {
                        // if making the buffer contiguous resulted in the slice increasing, then
                        // try to parse a packet again
                        if msg.make_contiguous().len() > initial_len {
                            continue;
                        }

                        // otherwise, we'll need to receive more bytes from the stream to correctly
                        // parse a packet

                        // if we have pending data greater than the max datagram size then it's never going to parse
                        if msg.payload_len() > crate::stream::MAX_DATAGRAM_SIZE {
                            tracing::error!(
                                unconsumed = msg.payload_len(),
                                remaining_capacity = msg.remaining_capacity()
                            );
                            msg.clear();
                            return Err(recv::error::Kind::Decode.into());
                        }

                        tracing::trace!(
                            socket_kind = %"stream",
                            unexpected_eof = len,
                            buffer_len = initial_len
                        );

                        break;
                    }

                    tracing::error!(
                        socket_kind = %"stream",
                        fatal_error = %decoder_error,
                        payload_len = msg.payload_len()
                    );

                    // any other decoder errors mean the stream has been corrupted so
                    // it's time to shut down the connection
                    msg.clear();
                    return Err(recv::error::Kind::Decode.into());
                }
            };

            if let Err(err) = router.on_packet(&remote_addr, ecn, packet) {
                // the stream errored and we can't recover so clear out the buffer
                msg.clear();
                return Err(err);
            }
        }

        if let Some(len) = prev_packet_len.take() {
            msg.consume(len);
        }

        Ok(())
    }