in quic/s2n-quic-transport/src/stream/manager/tests.rs [2428:2653]
fn on_transmit_queries_streams_for_data() {
fn assert_stream_write_state(
manager: &mut AbstractStreamManager<MockStream>,
stream_id: StreamId,
expected_on_transmit_count: usize,
expected_on_transmit_try_write: usize,
) {
manager.with_asserted_stream(stream_id, |stream| {
assert_eq!(expected_on_transmit_count, stream.on_transmit_count);
assert_eq!(
expected_on_transmit_try_write,
stream.on_transmit_try_write_frames
);
});
}
let mut manager = create_stream_manager(endpoint::Type::Server);
let mut frame_buffer = OutgoingFrameBuffer::new();
// Create some open Streams with interests
let stream_1 = try_open(&mut manager, StreamType::Bidirectional).unwrap();
let stream_2 = try_open(&mut manager, StreamType::Unidirectional).unwrap();
let stream_3 = try_open(&mut manager, StreamType::Bidirectional).unwrap();
let stream_4 = try_open(&mut manager, StreamType::Unidirectional).unwrap();
let stream_5 = try_open(&mut manager, StreamType::Unidirectional).unwrap();
manager.with_asserted_stream(stream_1, |stream| {
stream.on_transmit_try_write_frames = 1;
});
manager.with_asserted_stream(stream_3, |stream| {
stream.on_transmit_try_write_frames = 1;
});
manager.with_asserted_stream(stream_4, |stream| {
stream.on_transmit_try_write_frames = 1;
});
manager.with_asserted_stream(stream_5, |stream| {
stream.on_transmit_try_write_frames = 1;
stream.lost_data = true;
});
assert_eq!(
[stream_1, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);
assert_eq!([stream_5], *manager.streams_waiting_for_retransmission());
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
write_context.transmission_constraint = transmission::Constraint::CongestionLimited;
assert!(manager.on_transmit(&mut write_context).is_ok());
assert!(
write_context.frame_buffer.is_empty(),
"no frames are written when congestion limited"
);
write_context.transmission_constraint = transmission::Constraint::RetransmissionOnly;
assert!(manager.on_transmit(&mut write_context).is_ok());
// Only lost data may be written when constrained to retransmission only
assert_stream_write_state(&mut manager, stream_5, 1, 0);
assert_eq!(1, write_context.frame_buffer.len());
assert!(manager.streams_waiting_for_retransmission().is_empty());
write_context.transmission_constraint = transmission::Constraint::None;
assert!(manager.on_transmit(&mut write_context).is_ok());
for stream_id in &[stream_1, stream_3, stream_4] {
assert_stream_write_state(&mut manager, *stream_id, 1, 0);
}
// All 4 streams have written a frame plus there should also be an empty STREAM frame for the open notify
assert_eq!(5, frame_buffer.len());
frame_buffer.clear();
assert!(manager.streams_waiting_for_transmission().is_empty());
manager.with_asserted_stream(stream_1, |stream| {
stream.on_transmit_try_write_frames = 10;
});
manager.with_asserted_stream(stream_2, |stream| {
stream.on_transmit_try_write_frames = 10;
});
manager.with_asserted_stream(stream_3, |stream| {
stream.on_transmit_try_write_frames = 10;
});
manager.with_asserted_stream(stream_4, |stream| {
stream.on_transmit_try_write_frames = 10;
});
frame_buffer.set_error_write_after_n_frames(15);
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
assert_eq!(
[stream_1, stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);
assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
);
assert_stream_write_state(&mut manager, stream_1, 2, 0);
assert_stream_write_state(&mut manager, stream_2, 1, 5);
assert_stream_write_state(&mut manager, stream_3, 1, 10);
assert_stream_write_state(&mut manager, stream_4, 1, 10);
assert_eq!(15, frame_buffer.len());
frame_buffer.clear();
assert_eq!(
[stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);
// Query stream_1 for data again. It should however be asked at the end
manager.with_asserted_stream(stream_1, |stream| {
stream.on_transmit_try_write_frames = 10;
});
assert_eq!(
[stream_2, stream_3, stream_4, stream_1],
*manager.streams_waiting_for_transmission()
);
frame_buffer.set_error_write_after_n_frames(15);
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
);
assert_stream_write_state(&mut manager, stream_1, 2, 10);
assert_stream_write_state(&mut manager, stream_2, 2, 0);
assert_stream_write_state(&mut manager, stream_3, 2, 0);
assert_stream_write_state(&mut manager, stream_4, 2, 10);
assert_eq!(15, frame_buffer.len());
frame_buffer.clear();
assert_eq!(
[stream_4, stream_1],
*manager.streams_waiting_for_transmission()
);
frame_buffer.set_error_write_after_n_frames(5);
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
);
assert_stream_write_state(&mut manager, stream_1, 2, 10);
assert_stream_write_state(&mut manager, stream_2, 2, 0);
assert_stream_write_state(&mut manager, stream_3, 2, 0);
assert_stream_write_state(&mut manager, stream_4, 3, 5);
assert_eq!(5, frame_buffer.len());
frame_buffer.clear();
assert_eq!(
[stream_4, stream_1],
*manager.streams_waiting_for_transmission()
);
frame_buffer.set_error_write_after_n_frames(5);
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
);
assert_stream_write_state(&mut manager, stream_1, 3, 10);
assert_stream_write_state(&mut manager, stream_2, 2, 0);
assert_stream_write_state(&mut manager, stream_3, 2, 0);
assert_stream_write_state(&mut manager, stream_4, 4, 0);
assert_eq!(5, frame_buffer.len());
frame_buffer.clear();
assert_eq!([stream_1], *manager.streams_waiting_for_transmission());
frame_buffer.set_error_write_after_n_frames(11);
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
assert_eq!(Ok(()), manager.on_transmit(&mut write_context));
assert_stream_write_state(&mut manager, stream_1, 4, 0);
assert_stream_write_state(&mut manager, stream_2, 2, 0);
assert_stream_write_state(&mut manager, stream_3, 2, 0);
assert_stream_write_state(&mut manager, stream_4, 4, 0);
assert_eq!(10, frame_buffer.len());
frame_buffer.clear();
assert!(manager.streams_waiting_for_transmission().is_empty());
}