in dc/s2n-quic-dc/src/stream/recv/state.rs [682:839]
fn on_transmit_ack<K, A, Clk>(
&mut self,
key: &K,
credentials: &Credentials,
stream_id: stream::Id,
source_queue_id: Option<VarInt>,
output: &mut A,
_clock: &Clk,
) where
K: crypto::seal::control::Stream,
A: Allocator,
Clk: Clock + ?Sized,
{
ensure!(self.should_transmit());
let mtu = self.mtu();
output.set_ecn(self.ecn());
let packet_number = self.next_pn();
ensure!(let Some(segment) = output.alloc());
let buffer = output.get_mut(&segment);
buffer.resize(mtu as _, 0);
let encoder = EncoderBuffer::new(buffer);
// TODO compute this by storing the time that we received the max packet number
let ack_delay = VarInt::ZERO;
let max_data = frame::MaxData {
maximum_data: self.max_data,
};
let max_data_encoding_size: VarInt = max_data.encoding_size().try_into().unwrap();
// compute the recovery ACKs first so we have enough space for those - if we run out, the sender will
// convert the stream PNs anyway
let (recovery_ack, max_data_encoding_size) =
self.recovery_ack
.encoding(max_data_encoding_size, ack_delay, None, mtu);
let (stream_ack, max_data_encoding_size) = self.stream_ack.encoding(
max_data_encoding_size,
ack_delay,
Some(self.ecn_counts),
mtu,
);
let encoding_size = max_data_encoding_size;
tracing::trace!(?stream_ack, ?recovery_ack, ?max_data);
let frame = ((max_data, stream_ack), recovery_ack);
let result = control::encoder::encode(
encoder,
source_queue_id,
Some(stream_id),
packet_number,
VarInt::ZERO,
&mut &[][..],
encoding_size,
&frame,
key,
credentials,
);
match result {
0 => {
output.free(segment);
return;
}
packet_len => {
buffer.truncate(packet_len);
// get how many intervals we're tracking - the more there are, the more loss the network
// is experiencing
let intervals = self.stream_ack.packets.interval_len()
+ self.recovery_ack.packets.interval_len();
// The value of `20` is somewhat arbitrary but doing some worst-case math the ACK ranges with
// 20 segments would consume about 20-25% of the packet this is a good starting point.
// We dont't want to go too much lower otherwise we end up spamming ACKs.
let mut duplicate_threshold = 20;
let mut should_duplicate = false;
// if the number of intervals is large then we should duplicate the ACKs to try and recover
should_duplicate |= intervals > duplicate_threshold;
// if we have recovery packets then the network is likely lossy so we duplicate the ACKs to increase the
// likelihood that the sender will recover
should_duplicate |= !self.recovery_ack.packets.is_empty();
let duplicate = if should_duplicate {
Some(buffer.clone())
} else {
None
};
output.push(segment);
if let Some(buffer) = duplicate {
loop {
let Some(segment) = output.alloc() else {
break;
};
let buf = output.get_mut(&segment);
if intervals > duplicate_threshold {
buf.extend_from_slice(&buffer);
output.push(segment);
// exponentially increase the threshold
duplicate_threshold *= 2;
continue;
} else {
*buf = buffer;
output.push(segment);
break;
}
}
}
}
}
for (ack, space) in [
(&self.stream_ack, stream::PacketSpace::Stream),
(&self.recovery_ack, stream::PacketSpace::Recovery),
] {
let metrics = (
ack.packets.min_value(),
ack.packets.max_value(),
ack.packets.interval_len().checked_sub(1),
);
if let (Some(min), Some(max), Some(gaps)) = metrics {
probes::on_transmit_control(
credentials.id,
stream_id,
space,
packet_number,
min,
max,
gaps,
);
};
}
// make sure we sent a packet
ensure!(!output.is_empty());
// record the max value we've seen for removing old packet numbers
self.stream_ack.on_transmit(packet_number);
self.recovery_ack.on_transmit(packet_number);
self.on_packet_sent(packet_number);
}