in dc/s2n-quic-dc/src/stream/send/application.rs [154:274]
fn poll_write_from<S>(
&mut self,
cx: &mut Context,
buf: &mut S,
is_fin: bool,
) -> Poll<io::Result<usize>>
where
S: buffer::reader::storage::Infallible,
{
// Try to flush any pending packets
let flushed_len = ready!(self.poll_flush_buffer(cx, buf.buffered_len()))?;
// if the flushed len is non-zero then return it to the application before accepting more
// bytes to buffer
ensure!(flushed_len == 0, Ok(flushed_len).into());
// if we're not open, then make sure this is an empty write
if !matches!(self.status, Status::Open) {
ensure!(
buf.buffer_is_empty() && is_fin,
Err(io::Error::from(io::ErrorKind::BrokenPipe)).into()
);
return Ok(0).into();
}
// make sure the queue is drained before continuing
ensure!(self.queue.is_empty(), Ok(flushed_len).into());
let app = self.shared.application();
let max_header_len = app.max_header_len();
let max_segments = self.shared.gso.max_segments();
// create a flow request from the provided application input
let initial_len = buf.buffered_len();
let mut request = flow::Request {
len: initial_len,
initial_len,
is_fin,
};
let path = self.shared.sender.path.load();
let features = self.sockets.features();
if !features.is_flow_controlled() {
// clamp the flow request based on the path state
request.clamp(path.max_flow_credits(max_header_len, max_segments));
}
// acquire flow credits from the worker
let credits = ready!(self.shared.sender.flow.poll_acquire(cx, request, &features))?;
// update the status if this write included the final offset
if credits.is_fin {
self.status = Status::WroteFin;
}
trace!(?credits);
let mut batch = if features.is_reliable() {
// the protocol does recovery for us so no need to track the transmissions
None
} else {
// if we are using unreliable sockets then we need to write transmissions to a batch for the
// worker to track for recovery
let batch = self
.shared
.sender
.application_transmission_queue
.alloc_batch(msg::segment::MAX_COUNT);
Some(batch)
};
let stream_id = self.shared.stream_id();
let local_queue_id = self.shared.local_queue_id();
self.queue.push_buffer(
buf,
&mut batch,
max_segments,
&self.shared.sender.segment_alloc,
|message, buf| {
self.shared.crypto.seal_with(
|sealer| {
// push packets for transmission into our queue
app.transmit(
credits,
&path,
buf,
&self.shared.sender.packet_number,
sealer,
self.shared.credentials(),
&stream_id,
local_queue_id,
&clock::Cached::new(&self.shared.clock),
message,
&features,
)
},
|sealer| {
if features.is_reliable() {
sealer.update(&self.shared.clock, &self.shared.subscriber);
} else {
// TODO enqueue a full flush of any pending transmissions before
// updating the key.
}
},
)
},
)?;
if let Some(batch) = batch {
// send the transmission information off to the worker before flushing to the socket so the
// worker is prepared to handle ACKs from the peer
self.shared.sender.push_to_worker(batch)?;
}
// flush the queue of packets to the socket
self.poll_flush_buffer(cx, usize::MAX)
}