fn flush_write_buffer()

in netbench/src/multiplex.rs [102:176]


    fn flush_write_buffer(&mut self, cx: &mut Context) -> Result<()> {
        if !self.tx_open {
            return if self.write_buf.is_empty() {
                Ok(())
            } else {
                Err("stream was closed with pending data".into())
            };
        }

        let mut total_len = 0;

        if self.inner.as_ref().is_write_vectored() {
            while !self.write_buf.is_empty() {
                let chunks = self.write_buf.chunks();

                match self.inner.as_mut().poll_write_vectored(cx, &chunks) {
                    Poll::Ready(result) => {
                        let len = result?;
                        self.write_buf.advance(len);
                        total_len += len;
                    }
                    Poll::Pending => {
                        // we don't need to flush since this call does that
                        self.tx_flushing = false;
                    }
                }
            }
        } else {
            while let Some(mut chunk) = self.write_buf.pop_front() {
                match self.inner.as_mut().poll_write(cx, &chunk) {
                    Poll::Ready(result) => {
                        let len = result?;
                        chunk.advance(len);
                        total_len += len;

                        if !chunk.is_empty() {
                            self.write_buf.push_front(chunk);
                        }

                        if len == 0 {
                            return if self.write_buf.is_empty() {
                                self.tx_open = false;
                                Ok(())
                            } else {
                                Err("stream was closed with pending data".into())
                            };
                        }
                    }
                    Poll::Pending => {
                        self.write_buf.push_front(chunk);
                        // we don't need to flush since this call does that
                        self.tx_flushing = false;
                        break;
                    }
                }
            }
        }

        if total_len > 0 {
            self.write_buf.notify(cx);

            // if we don't have anything buffered, tell the socket
            // to flush to the operating system
            self.tx_flushing = self.write_buf.is_empty();
        }

        if self.tx_flushing {
            if let Poll::Ready(res) = self.inner.as_mut().poll_flush(cx) {
                self.tx_flushing = false;
                res?;
            }
        }

        Ok(())
    }