fn poll_write_from()

in dc/s2n-quic-dc/src/stream/send/application.rs [154:274]


    fn poll_write_from<S>(
        &mut self,
        cx: &mut Context,
        buf: &mut S,
        is_fin: bool,
    ) -> Poll<io::Result<usize>>
    where
        S: buffer::reader::storage::Infallible,
    {
        // Try to flush any pending packets
        let flushed_len = ready!(self.poll_flush_buffer(cx, buf.buffered_len()))?;

        // if the flushed len is non-zero then return it to the application before accepting more
        // bytes to buffer
        ensure!(flushed_len == 0, Ok(flushed_len).into());

        // if we're not open, then make sure this is an empty write
        if !matches!(self.status, Status::Open) {
            ensure!(
                buf.buffer_is_empty() && is_fin,
                Err(io::Error::from(io::ErrorKind::BrokenPipe)).into()
            );
            return Ok(0).into();
        }

        // make sure the queue is drained before continuing
        ensure!(self.queue.is_empty(), Ok(flushed_len).into());

        let app = self.shared.application();
        let max_header_len = app.max_header_len();
        let max_segments = self.shared.gso.max_segments();

        // create a flow request from the provided application input
        let initial_len = buf.buffered_len();
        let mut request = flow::Request {
            len: initial_len,
            initial_len,
            is_fin,
        };

        let path = self.shared.sender.path.load();

        let features = self.sockets.features();

        if !features.is_flow_controlled() {
            // clamp the flow request based on the path state
            request.clamp(path.max_flow_credits(max_header_len, max_segments));
        }

        // acquire flow credits from the worker
        let credits = ready!(self.shared.sender.flow.poll_acquire(cx, request, &features))?;

        // update the status if this write included the final offset
        if credits.is_fin {
            self.status = Status::WroteFin;
        }

        trace!(?credits);

        let mut batch = if features.is_reliable() {
            // the protocol does recovery for us so no need to track the transmissions
            None
        } else {
            // if we are using unreliable sockets then we need to write transmissions to a batch for the
            // worker to track for recovery

            let batch = self
                .shared
                .sender
                .application_transmission_queue
                .alloc_batch(msg::segment::MAX_COUNT);
            Some(batch)
        };

        let stream_id = self.shared.stream_id();
        let local_queue_id = self.shared.local_queue_id();

        self.queue.push_buffer(
            buf,
            &mut batch,
            max_segments,
            &self.shared.sender.segment_alloc,
            |message, buf| {
                self.shared.crypto.seal_with(
                    |sealer| {
                        // push packets for transmission into our queue
                        app.transmit(
                            credits,
                            &path,
                            buf,
                            &self.shared.sender.packet_number,
                            sealer,
                            self.shared.credentials(),
                            &stream_id,
                            local_queue_id,
                            &clock::Cached::new(&self.shared.clock),
                            message,
                            &features,
                        )
                    },
                    |sealer| {
                        if features.is_reliable() {
                            sealer.update(&self.shared.clock, &self.shared.subscriber);
                        } else {
                            // TODO enqueue a full flush of any pending transmissions before
                            // updating the key.
                        }
                    },
                )
            },
        )?;

        if let Some(batch) = batch {
            // send the transmission information off to the worker before flushing to the socket so the
            // worker is prepared to handle ACKs from the peer
            self.shared.sender.push_to_worker(batch)?;
        }

        // flush the queue of packets to the socket
        self.poll_flush_buffer(cx, usize::MAX)
    }