fn on_transmit_ack()

in dc/s2n-quic-dc/src/stream/recv/state.rs [682:839]


    fn on_transmit_ack<K, A, Clk>(
        &mut self,
        key: &K,
        credentials: &Credentials,
        stream_id: stream::Id,
        source_queue_id: Option<VarInt>,
        output: &mut A,
        _clock: &Clk,
    ) where
        K: crypto::seal::control::Stream,
        A: Allocator,
        Clk: Clock + ?Sized,
    {
        ensure!(self.should_transmit());

        let mtu = self.mtu();

        output.set_ecn(self.ecn());

        let packet_number = self.next_pn();

        ensure!(let Some(segment) = output.alloc());

        let buffer = output.get_mut(&segment);
        buffer.resize(mtu as _, 0);

        let encoder = EncoderBuffer::new(buffer);

        // TODO compute this by storing the time that we received the max packet number
        let ack_delay = VarInt::ZERO;

        let max_data = frame::MaxData {
            maximum_data: self.max_data,
        };
        let max_data_encoding_size: VarInt = max_data.encoding_size().try_into().unwrap();

        // compute the recovery ACKs first so we have enough space for those - if we run out, the sender will
        // convert the stream PNs anyway
        let (recovery_ack, max_data_encoding_size) =
            self.recovery_ack
                .encoding(max_data_encoding_size, ack_delay, None, mtu);

        let (stream_ack, max_data_encoding_size) = self.stream_ack.encoding(
            max_data_encoding_size,
            ack_delay,
            Some(self.ecn_counts),
            mtu,
        );

        let encoding_size = max_data_encoding_size;

        tracing::trace!(?stream_ack, ?recovery_ack, ?max_data);

        let frame = ((max_data, stream_ack), recovery_ack);

        let result = control::encoder::encode(
            encoder,
            source_queue_id,
            Some(stream_id),
            packet_number,
            VarInt::ZERO,
            &mut &[][..],
            encoding_size,
            &frame,
            key,
            credentials,
        );

        match result {
            0 => {
                output.free(segment);
                return;
            }
            packet_len => {
                buffer.truncate(packet_len);

                // get how many intervals we're tracking - the more there are, the more loss the network
                // is experiencing
                let intervals = self.stream_ack.packets.interval_len()
                    + self.recovery_ack.packets.interval_len();

                // The value of `20` is somewhat arbitrary but doing some worst-case math the ACK ranges with
                // 20 segments would consume about 20-25% of the packet this is a good starting point.
                // We dont't want to go too much lower otherwise we end up spamming ACKs.
                let mut duplicate_threshold = 20;

                let mut should_duplicate = false;

                // if the number of intervals is large then we should duplicate the ACKs to try and recover
                should_duplicate |= intervals > duplicate_threshold;

                // if we have recovery packets then the network is likely lossy so we duplicate the ACKs to increase the
                // likelihood that the sender will recover
                should_duplicate |= !self.recovery_ack.packets.is_empty();

                let duplicate = if should_duplicate {
                    Some(buffer.clone())
                } else {
                    None
                };

                output.push(segment);

                if let Some(buffer) = duplicate {
                    loop {
                        let Some(segment) = output.alloc() else {
                            break;
                        };
                        let buf = output.get_mut(&segment);
                        if intervals > duplicate_threshold {
                            buf.extend_from_slice(&buffer);
                            output.push(segment);

                            // exponentially increase the threshold
                            duplicate_threshold *= 2;

                            continue;
                        } else {
                            *buf = buffer;
                            output.push(segment);
                            break;
                        }
                    }
                }
            }
        }

        for (ack, space) in [
            (&self.stream_ack, stream::PacketSpace::Stream),
            (&self.recovery_ack, stream::PacketSpace::Recovery),
        ] {
            let metrics = (
                ack.packets.min_value(),
                ack.packets.max_value(),
                ack.packets.interval_len().checked_sub(1),
            );
            if let (Some(min), Some(max), Some(gaps)) = metrics {
                probes::on_transmit_control(
                    credentials.id,
                    stream_id,
                    space,
                    packet_number,
                    min,
                    max,
                    gaps,
                );
            };
        }

        // make sure we sent a packet
        ensure!(!output.is_empty());

        // record the max value we've seen for removing old packet numbers
        self.stream_ack.on_transmit(packet_number);
        self.recovery_ack.on_transmit(packet_number);

        self.on_packet_sent(packet_number);
    }