in dubbo/src/triple/decode.rs [174:217]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
if self.state == State::Error {
return Poll::Ready(None);
}
if let Some(item) = self.decode_chunk()? {
return Poll::Ready(Some(Ok(item)));
}
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
return Poll::Ready(Some(Err(crate::status::Status::new(
crate::status::Code::Internal,
err.to_string(),
))));
}
None => None,
};
if let Some(data) = chunk {
self.buf.put(data)
} else {
break;
}
}
match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {
Ok(trailer) => {
self.trailers = trailer.map(Metadata::from_headers);
}
Err(err) => {
tracing::error!("poll_trailers, err: {}", err);
}
}
Poll::Ready(None)
}