fn blocked_on_local_concurrent_stream_limit()

in quic/s2n-quic-transport/src/stream/manager/tests.rs [1251:1374]


fn blocked_on_local_concurrent_stream_limit() {
    for stream_type in [StreamType::Bidirectional, StreamType::Unidirectional] {
        let mut manager = create_stream_manager(endpoint::Type::Server);

        // The peer allows a large amount of streams to be opened
        assert!(manager
            .on_max_streams(&MaxStreams {
                stream_type,
                maximum_streams: VarInt::from_u32(100_000),
            })
            .is_ok());

        let available_outgoing_stream_capacity = manager.with_stream_controller(|ctrl| {
            ctrl.available_local_initiated_stream_capacity(stream_type)
        });

        assert!(available_outgoing_stream_capacity < VarInt::from_u32(100_000));

        let (waker, wake_counter) = new_count_waker();
        let (mut wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
        let mut token = connection::OpenToken::new();

        for _i in 0..*available_outgoing_stream_capacity {
            assert!(manager
                .poll_open_local_stream(
                    stream_type,
                    &mut token,
                    &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                    &Context::from_waker(&waker)
                )
                .is_ready());
        }

        assert_eq!(wake_counter, 0);

        // Cannot open any more streams
        assert!(manager
            .poll_open_local_stream(
                stream_type,
                &mut token,
                &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                &Context::from_waker(&waker)
            )
            .is_pending());

        if stream_type.is_bidirectional() {
            // if we have a bidirectional stream, then the controller should transmit an empty STREAM frame in order
            // to notify the peer of its existence.

            assert_wakeups(&mut wakeup_queue, 1);

            let mut frame_buffer = OutgoingFrameBuffer::new();
            let mut write_context = MockWriteContext::new(
                time::now(),
                &mut frame_buffer,
                transmission::Constraint::None,
                transmission::Mode::Normal,
                endpoint::Type::Server,
            );

            manager.on_transmit(&mut write_context).unwrap();

            let mut stream_frame = frame_buffer
                .pop_front()
                .expect("missing empty STREAM frame");

            match stream_frame.as_frame() {
                Frame::Stream(frame) => {
                    assert_eq!(
                        frame.stream_id,
                        StreamId::nth(
                            endpoint::Type::Server,
                            stream_type,
                            available_outgoing_stream_capacity.as_u64() - 1
                        )
                        .unwrap()
                        .as_varint()
                    );
                    assert_eq!(frame.offset, VarInt::default());
                    assert!(frame.data.is_empty());
                }
                frame => panic!("unexpected frame: {:?}", frame),
            }

            assert!(frame_buffer.is_empty());
        }

        // No STREAMS_BLOCKED frame should be transmitted since we are blocked on the local
        // limit not the peer's limit.
        assert!(manager.get_transmission_interest().is_none());

        // Close one stream
        manager.with_asserted_stream(
            StreamId::initial(endpoint::Type::Server, stream_type),
            |stream| {
                stream.interests.retained = false;
            },
        );

        // One more stream can be opened
        assert!(manager
            .poll_open_local_stream(
                stream_type,
                &mut token,
                &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                &Context::from_waker(&waker)
            )
            .is_ready());
        assert_eq!(wake_counter, 1);
        assert!(manager
            .poll_open_local_stream(
                stream_type,
                &mut token,
                &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                &Context::from_waker(&waker)
            )
            .is_pending());

        // Close the stream manager and verify the wake counter is incremented
        manager.close(connection::Error::application(1u8.into()));

        assert_eq!(wake_counter, 2);
    }
}