in neqo-http3/src/recv_message.rs [254:342]
fn receive_internal(&mut self, conn: &mut Connection, post_readable_event: bool) -> Res<()> {
loop {
qdebug!("[{self}] state={:?}", self.state);
match &mut self.state {
// In the following 3 states we need to read frames.
RecvMessageState::WaitingForResponseHeaders { frame_reader }
| RecvMessageState::WaitingForData { frame_reader }
| RecvMessageState::WaitingForFinAfterTrailers { frame_reader } => {
match frame_reader.receive(&mut StreamReaderConnectionWrapper::new(
conn,
self.stream_id,
))? {
(None, true) => {
break self.set_state_to_close_pending(post_readable_event);
}
(None, false) => break Ok(()),
(Some(frame), fin) => {
qdebug!(
"[{self}] A new frame has been received: {frame:?}; state={:?} fin={fin}", self.state,
);
match frame {
HFrame::Headers { header_block } => {
self.handle_headers_frame(header_block, fin)?;
}
HFrame::Data { len } => self.handle_data_frame(len, fin)?,
HFrame::PushPromise {
push_id,
header_block,
} => self.handle_push_promise(push_id, header_block)?,
_ => break Err(Error::HttpFrameUnexpected),
}
if matches!(self.state, RecvMessageState::Closed) {
break Ok(());
}
if fin
&& !matches!(self.state, RecvMessageState::DecodingHeaders { .. })
{
break self.set_state_to_close_pending(post_readable_event);
}
}
}
}
RecvMessageState::DecodingHeaders { header_block, fin } => {
if self
.qpack_decoder
.borrow()
.refers_dynamic_table(header_block)?
&& !self.blocked_push_promise.is_empty()
{
qinfo!(
"[{self}] decoding header is blocked waiting for a push_promise header block"
);
break Ok(());
}
let done = *fin;
let d_headers = self
.qpack_decoder
.borrow_mut()
.decode_header_block(header_block, self.stream_id)?;
if let Some(headers) = d_headers {
self.add_headers(headers, done)?;
if matches!(
self.state,
RecvMessageState::Closed | RecvMessageState::ExtendedConnect
) {
break Ok(());
}
} else {
qinfo!("[{self}] decoding header is blocked");
break Ok(());
}
}
RecvMessageState::ReadingData { .. } => {
if post_readable_event {
self.conn_events.data_readable(self.get_stream_info());
}
break Ok(());
}
RecvMessageState::ClosePending | RecvMessageState::Closed => {
panic!("Stream readable after being closed!");
}
RecvMessageState::ExtendedConnect => {
// Ignore read event, this request is waiting to be picked up by a new
// WebTransportSession
break Ok(());
}
}
}
}