in neqo-transport/src/send_stream.rs [1246:1293]
fn send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize> {
if buf.is_empty() {
qerror!("[{self}] zero-length send on stream");
return Err(Error::InvalidInput);
}
if let SendStreamState::Ready { fc, conn_fc } = &mut self.state {
let owned_fc = mem::replace(fc, SenderFlowControl::new(self.stream_id, 0));
let owned_conn_fc = Rc::clone(conn_fc);
self.state.transition(SendStreamState::Send {
fc: owned_fc,
conn_fc: owned_conn_fc,
send_buf: TxBuffer::new(),
});
}
if !matches!(self.state, SendStreamState::Send { .. }) {
return Err(Error::FinalSizeError);
}
let buf = if self.avail() == 0 {
return Ok(0);
} else if self.avail() < buf.len() {
if atomic {
self.send_blocked_if_space_needed(buf.len());
return Ok(0);
}
&buf[..self.avail()]
} else {
buf
};
match &mut self.state {
SendStreamState::Ready { .. } => unreachable!(),
SendStreamState::Send {
fc,
conn_fc,
send_buf,
} => {
let sent = send_buf.send(buf);
fc.consume(sent);
conn_fc.borrow_mut().consume(sent);
Ok(sent)
}
_ => Err(Error::FinalSizeError),
}
}