in quic/s2n-quic-transport/src/sync/data_sender.rs [585:751]
fn check_model(events: &[Event], id: &VarInt) -> OutgoingFrameBuffer {
let mut send_data = stream::Data::new(u64::MAX);
let mut sender: DataSender<_, writer::Stream> =
DataSender::new(TestFlowController::default(), u32::MAX);
let mut total_len = 0;
let mut frame_buffer = OutgoingFrameBuffer::new();
let mut context = MockWriteContext {
current_time: time::now(),
frame_buffer: &mut frame_buffer,
transmission_constraint: transmission::Constraint::None,
transmission_mode: transmission::Mode::Normal,
endpoint: endpoint::Type::Server,
};
let mut lost = HashSet::new();
let mut pending = HashSet::new();
let mut acked = HashSet::new();
let mut is_finished = false;
for event in events.iter().copied() {
match event {
Event::Push(len) if !is_finished => {
let mut chunks = [Bytes::new(), Bytes::new()];
let count = send_data
.send(len as usize, &mut chunks)
.expect("stream should not end early");
for chunk in chunks.iter_mut().take(count) {
total_len += chunk.len() as u64;
sender.push(core::mem::replace(chunk, Bytes::new()));
}
}
Event::Finish if total_len > 0 => {
sender.finish();
is_finished = true;
}
Event::Transmit(capacity, constraint) => {
let interest = sender.get_transmission_interest();
let prev_len = context.frame_buffer.len();
context.transmission_constraint = constraint;
context
.frame_buffer
.set_max_packet_size(Some(capacity as usize));
let _ = sender.on_transmit(*id, &mut context);
context.frame_buffer.flush();
if interest.is_none() {
assert_eq!(
context.frame_buffer.len(),
prev_len,
"frames should only transmit with interest"
);
}
let packets = context
.frame_buffer
.frames
.iter()
.skip(prev_len)
.map(|frame| frame.packet_nr);
pending.extend(packets);
}
Event::Ack(index) if !context.frame_buffer.is_empty() => {
let index = index % context.frame_buffer.len();
let packet = context.frame_buffer.frames[index].packet_nr;
sender.on_packet_ack(&packet);
pending.remove(&packet);
acked.insert(packet);
}
Event::Loss(index) if !context.frame_buffer.is_empty() => {
let index = index % context.frame_buffer.len();
let packet = context.frame_buffer.frames[index].packet_nr;
lost.insert(packet);
sender.on_packet_loss(&packet);
}
Event::IncFlowControl(amount) => {
let flow_controller = sender.flow_controller_mut();
flow_controller.max_offset = flow_controller
.max_offset
.saturating_add(VarInt::from_u16(amount));
}
_ => {}
}
}
// make sure the stream was finished
sender.finish();
for packet in pending {
sender.on_packet_ack(&packet);
acked.insert(packet);
}
context.frame_buffer.set_max_packet_size(Some(usize::MAX));
context.transmission_constraint = transmission::Constraint::None;
sender.flow_controller_mut().clear_blocked();
sender.flow_controller_mut().max_offset = VarInt::MAX;
while sender.has_transmission_interest() {
let prev_len = context.frame_buffer.len();
let _ = sender.on_transmit(*id, &mut context);
context.frame_buffer.flush();
let packets = context
.frame_buffer
.frames
.iter()
.skip(prev_len)
.map(|frame| frame.packet_nr);
let mut did_transmit = false;
for packet in packets {
sender.on_packet_ack(&packet);
acked.insert(packet);
did_transmit = true;
}
assert!(
did_transmit,
"transmission_interest was expressed but sender did not transmit: {:#?}",
sender
);
}
assert!(
!frame_buffer.is_empty(),
"the test should transmit at least one frame: {:#?}",
sender,
);
let receiver = stream::Data::new(total_len);
let mut received_ranges = IntervalSet::new();
let mut transmitted_fin = false;
for frame in &mut frame_buffer.frames {
if acked.contains(&frame.packet_nr) {
if let frame::Frame::Stream(frame) = frame.as_frame() {
let offset = frame.offset.as_u64();
let len = frame.data.len() as u64;
if len > 0 {
receiver.receive_at(offset, &[frame.data.as_less_safe_slice()]);
received_ranges.insert(offset..offset + len).unwrap();
}
transmitted_fin |= frame.is_fin;
} else {
panic!("invalid frame");
}
}
}
if total_len != 0 {
assert_eq!(
received_ranges.interval_len(),
1,
"not all data was transmitted",
);
assert_eq!(received_ranges.max_value(), Some(total_len - 1));
} else {
assert!(received_ranges.is_empty(), "{:#?}", sender);
}
assert!(transmitted_fin);
frame_buffer
}