in netbench/src/multiplex.rs [102:176]
fn flush_write_buffer(&mut self, cx: &mut Context) -> Result<()> {
if !self.tx_open {
return if self.write_buf.is_empty() {
Ok(())
} else {
Err("stream was closed with pending data".into())
};
}
let mut total_len = 0;
if self.inner.as_ref().is_write_vectored() {
while !self.write_buf.is_empty() {
let chunks = self.write_buf.chunks();
match self.inner.as_mut().poll_write_vectored(cx, &chunks) {
Poll::Ready(result) => {
let len = result?;
self.write_buf.advance(len);
total_len += len;
}
Poll::Pending => {
// we don't need to flush since this call does that
self.tx_flushing = false;
}
}
}
} else {
while let Some(mut chunk) = self.write_buf.pop_front() {
match self.inner.as_mut().poll_write(cx, &chunk) {
Poll::Ready(result) => {
let len = result?;
chunk.advance(len);
total_len += len;
if !chunk.is_empty() {
self.write_buf.push_front(chunk);
}
if len == 0 {
return if self.write_buf.is_empty() {
self.tx_open = false;
Ok(())
} else {
Err("stream was closed with pending data".into())
};
}
}
Poll::Pending => {
self.write_buf.push_front(chunk);
// we don't need to flush since this call does that
self.tx_flushing = false;
break;
}
}
}
}
if total_len > 0 {
self.write_buf.notify(cx);
// if we don't have anything buffered, tell the socket
// to flush to the operating system
self.tx_flushing = self.write_buf.is_empty();
}
if self.tx_flushing {
if let Poll::Ready(res) = self.inner.as_mut().poll_flush(cx) {
self.tx_flushing = false;
res?;
}
}
Ok(())
}