in quic/s2n-quic-transport/src/stream/manager/tests.rs [1251:1374]
fn blocked_on_local_concurrent_stream_limit() {
for stream_type in [StreamType::Bidirectional, StreamType::Unidirectional] {
let mut manager = create_stream_manager(endpoint::Type::Server);
// The peer allows a large amount of streams to be opened
assert!(manager
.on_max_streams(&MaxStreams {
stream_type,
maximum_streams: VarInt::from_u32(100_000),
})
.is_ok());
let available_outgoing_stream_capacity = manager.with_stream_controller(|ctrl| {
ctrl.available_local_initiated_stream_capacity(stream_type)
});
assert!(available_outgoing_stream_capacity < VarInt::from_u32(100_000));
let (waker, wake_counter) = new_count_waker();
let (mut wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();
for _i in 0..*available_outgoing_stream_capacity {
assert!(manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker)
)
.is_ready());
}
assert_eq!(wake_counter, 0);
// Cannot open any more streams
assert!(manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker)
)
.is_pending());
if stream_type.is_bidirectional() {
// if we have a bidirectional stream, then the controller should transmit an empty STREAM frame in order
// to notify the peer of its existence.
assert_wakeups(&mut wakeup_queue, 1);
let mut frame_buffer = OutgoingFrameBuffer::new();
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);
manager.on_transmit(&mut write_context).unwrap();
let mut stream_frame = frame_buffer
.pop_front()
.expect("missing empty STREAM frame");
match stream_frame.as_frame() {
Frame::Stream(frame) => {
assert_eq!(
frame.stream_id,
StreamId::nth(
endpoint::Type::Server,
stream_type,
available_outgoing_stream_capacity.as_u64() - 1
)
.unwrap()
.as_varint()
);
assert_eq!(frame.offset, VarInt::default());
assert!(frame.data.is_empty());
}
frame => panic!("unexpected frame: {:?}", frame),
}
assert!(frame_buffer.is_empty());
}
// No STREAMS_BLOCKED frame should be transmitted since we are blocked on the local
// limit not the peer's limit.
assert!(manager.get_transmission_interest().is_none());
// Close one stream
manager.with_asserted_stream(
StreamId::initial(endpoint::Type::Server, stream_type),
|stream| {
stream.interests.retained = false;
},
);
// One more stream can be opened
assert!(manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker)
)
.is_ready());
assert_eq!(wake_counter, 1);
assert!(manager
.poll_open_local_stream(
stream_type,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&waker)
)
.is_pending());
// Close the stream manager and verify the wake counter is incremented
manager.close(connection::Error::application(1u8.into()));
assert_eq!(wake_counter, 2);
}
}