in dc/s2n-quic-dc/src/stream/recv/buffer/local.rs [131:214]
fn dispatch_buffer_stream<R>(&mut self, router: &mut R) -> Result<(), recv::Error>
where
R: Dispatch,
{
let msg = &mut self.recv_buffer;
let remote_addr = msg.remote_address();
let ecn = msg.ecn();
let tag_len = router.tag_len();
let mut prev_packet_len = None;
loop {
// consume the previous packet
if let Some(packet_len) = prev_packet_len.take() {
msg.consume(packet_len);
}
let segment = msg.peek();
ensure!(!segment.is_empty(), break);
let initial_len = segment.len();
let decoder = DecoderBufferMut::new(segment);
let packet = match decoder.decode_parameterized(tag_len) {
Ok((packet, remaining)) => {
prev_packet_len = Some(initial_len - remaining.len());
packet
}
Err(decoder_error) => {
if let DecoderError::UnexpectedEof(len) = decoder_error {
// if making the buffer contiguous resulted in the slice increasing, then
// try to parse a packet again
if msg.make_contiguous().len() > initial_len {
continue;
}
// otherwise, we'll need to receive more bytes from the stream to correctly
// parse a packet
// if we have pending data greater than the max datagram size then it's never going to parse
if msg.payload_len() > crate::stream::MAX_DATAGRAM_SIZE {
tracing::error!(
unconsumed = msg.payload_len(),
remaining_capacity = msg.remaining_capacity()
);
msg.clear();
return Err(recv::error::Kind::Decode.into());
}
tracing::trace!(
socket_kind = %"stream",
unexpected_eof = len,
buffer_len = initial_len
);
break;
}
tracing::error!(
socket_kind = %"stream",
fatal_error = %decoder_error,
payload_len = msg.payload_len()
);
// any other decoder errors mean the stream has been corrupted so
// it's time to shut down the connection
msg.clear();
return Err(recv::error::Kind::Decode.into());
}
};
if let Err(err) = router.on_packet(&remote_addr, ecn, packet) {
// the stream errored and we can't recover so clear out the buffer
msg.clear();
return Err(err);
}
}
if let Some(len) = prev_packet_len.take() {
msg.consume(len);
}
Ok(())
}