in quic/s2n-quic-transport/src/stream/manager/tests.rs [841:985]
fn send_streams_blocked_frame_when_blocked_by_peer() {
let mut manager = create_stream_manager(endpoint::Type::Server);
for stream_type in [StreamType::Bidirectional, StreamType::Unidirectional] {
let (waker, _) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();
let mut opened_streams = VarInt::from_u8(0);
// Open streams until blocked
while manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker),
)
.is_ready()
{
opened_streams += 1;
}
assert_eq!(
transmission::Interest::NewData,
manager.get_transmission_interest(),
"stream_type:{:?} opened_streams:{}",
stream_type,
opened_streams
);
let mut frame_buffer = OutgoingFrameBuffer::new();
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
let packet_number = write_context.packet_number();
assert!(manager.on_transmit(&mut write_context).is_ok());
let expected_frame = Frame::StreamsBlocked(StreamsBlocked {
stream_type,
stream_limit: opened_streams,
});
assert_eq!(
expected_frame,
write_context.frame_buffer.pop_front().unwrap().as_frame()
);
assert_eq!(
transmission::Interest::None,
manager.get_transmission_interest()
);
manager.on_packet_loss(&PacketNumberRange::new(packet_number, packet_number));
assert_eq!(
transmission::Interest::LostData,
manager.get_transmission_interest()
);
let packet_number = write_context.packet_number();
assert!(manager.on_transmit(&mut write_context).is_ok());
manager.on_packet_ack(&PacketNumberRange::new(packet_number, packet_number));
assert_eq!(
transmission::Interest::None,
manager.get_transmission_interest()
);
let expected_transmission_backoff = 4;
let expected_next_stream_blocked_time =
write_context.current_time + DEFAULT_SYNC_PERIOD * expected_transmission_backoff;
assert_eq!(
Some(expected_next_stream_blocked_time),
manager.next_expiration()
);
manager.on_timeout(expected_next_stream_blocked_time);
// Another STREAM_BLOCKED frame should be sent
assert_eq!(
transmission::Interest::NewData,
manager.get_transmission_interest()
);
// We get more credit from the peer so we should no longer send STREAM_BLOCKED
assert!(manager
.on_max_streams(&MaxStreams {
stream_type,
maximum_streams: VarInt::from_u32(200),
})
.is_ok());
assert_eq!(
transmission::Interest::None,
manager.get_transmission_interest()
);
// Close currently open streams to not block on local limits
for i in 0..*opened_streams {
let stream_id = StreamId::nth(endpoint::Type::Server, stream_type, i).unwrap();
manager.with_asserted_stream(stream_id, |stream| stream.interests.retained = false);
}
// Clear out the MAX_STREAMS frame
assert!(manager.on_transmit(&mut write_context).is_ok());
write_context.frame_buffer.clear();
// Open streams until blocked
while manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker),
)
.is_ready()
{
opened_streams += 1;
}
// Another STREAM_BLOCKED frame should be sent with the updated MAX_STREAMS value
assert_eq!(
transmission::Interest::NewData,
manager.get_transmission_interest()
);
assert!(manager.on_transmit(&mut write_context).is_ok());
let expected_frame = Frame::StreamsBlocked(StreamsBlocked {
stream_type,
stream_limit: VarInt::from_u32(200),
});
assert_eq!(
expected_frame,
write_context.frame_buffer.pop_front().unwrap().as_frame()
);
}
}