in quic/s2n-quic-platform/src/io/testing/model.rs [229:362]
fn execute(&mut self, buffers: &Buffers) -> usize {
let jitter = self.jitter();
let network_jitter = self.network_jitter();
let transmit_rate = self.transmit_rate();
let retransmit_rate = self.retransmit_rate();
let corrupt_rate = self.corrupt_rate();
let drop_rate = self.drop_rate();
let max_udp_payload = self.max_udp_payload() as usize;
let inflight_delay = self.inflight_delay();
let inflight_delay_threshold = self.inflight_delay_threshold();
let now = super::time::now();
let mut transmit_time = now + self.delay();
let transmit_time = &mut transmit_time;
#[inline]
fn gen_rate(rate: u64) -> bool {
// ensure the rate isn't 0 before actually generating a random number
rate > 0 && super::rand::produce::<u64>().any() < rate
}
let mut transmit = |packet: Cow<Packet>| {
// drop the packet if it's over the current MTU
if packet.payload.len() > max_udp_payload {
debug!("model::drop::mtu mtu={}", max_udp_payload);
return 0;
}
// drop packets that exceed the maximum number of inflight packets for the network
let max_inflight = self.max_inflight();
if self.inflight() >= max_inflight {
debug!("model::drop::inflight max_inflight={}", max_inflight);
return 0;
}
// drop the packet if enabled
if gen_rate(drop_rate) {
debug!("model::drop::rate");
return 0;
}
let mut packet = packet.into_owned();
if !packet.payload.is_empty() && gen_rate(corrupt_rate) {
use havoc::Strategy as _;
let new_len = havoc::Truncate
.randomly()
.and_then(havoc::Swap.repeat(0..packet.payload.len()).randomly())
.and_then(havoc::Mutate.repeat(0..packet.payload.len()).randomly())
.havoc_slice(&mut super::rand::Havoc, &mut packet.payload);
// if the len was changed, then update it
if new_len != packet.payload.len() {
packet.payload.truncate(new_len);
}
}
if !jitter.is_zero() {
// add a delay for the next packet to be transmitted
*transmit_time += gen_jitter(jitter);
}
// copy the transmit time for this packet
let mut transmit_time = *transmit_time;
if !network_jitter.is_zero() {
transmit_time += gen_jitter(network_jitter);
}
let model = self.clone();
let current_inflight = model.0.current_inflight.fetch_add(1, Ordering::SeqCst);
// scale the inflight delay by the number above the delay threshold
if let Some(mul) = current_inflight.checked_sub(inflight_delay_threshold) {
transmit_time += inflight_delay * mul as u32;
}
// reverse the addresses so the dst/src are correct for the receiver
packet.switch();
let buffers = buffers.clone();
// spawn a task that will push the packet onto the receiver queue at the transit time
super::spawn(async move {
// if the packet isn't scheduled to transmit immediately, wait until the computed
// time
if now != transmit_time {
super::time::delay_until(transmit_time).await;
}
buffers.rx(*packet.path.local_address, |queue| {
model.0.current_inflight.fetch_sub(1, Ordering::SeqCst);
queue.enqueue(packet);
});
});
1
};
let mut transmission_count = 0;
buffers.drain_pending_transmissions(|packet| {
let _span = debug_span!(
"packet",
dest = %packet.path.remote_address.0,
src = %packet.path.local_address.0,
len = packet.payload.len()
)
.entered();
// retransmit the packet until the rate fails or we retransmit 5
//
// We limit retransmissions to 5 just so we don't endlessly iterate when the
// `retransmit_rate` is high. This _should_ be high enough where we're getting
// retransmission coverage without needlessly saturating the network.
let mut count = 0;
while count < 5 && gen_rate(retransmit_rate) {
debug!("model::retransmit::rate count={count}");
transmission_count += transmit(Cow::Borrowed(&packet));
count += 1;
}
transmission_count += transmit(Cow::Owned(packet));
// continue transmitting as long as we are under the rate
if transmission_count < transmit_rate {
Ok(())
} else {
Err(())
}
});
transmission_count as usize
}