fn send_streams_blocked_frame_when_blocked_by_peer()

in quic/s2n-quic-transport/src/stream/manager/tests.rs [841:985]


fn send_streams_blocked_frame_when_blocked_by_peer() {
    let mut manager = create_stream_manager(endpoint::Type::Server);

    for stream_type in [StreamType::Bidirectional, StreamType::Unidirectional] {
        let (waker, _) = new_count_waker();
        let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
        let mut token = connection::OpenToken::new();

        let mut opened_streams = VarInt::from_u8(0);

        // Open streams until blocked
        while manager
            .poll_open_local_stream(
                stream_type,
                &mut token,
                &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                &Context::from_waker(&waker),
            )
            .is_ready()
        {
            opened_streams += 1;
        }

        assert_eq!(
            transmission::Interest::NewData,
            manager.get_transmission_interest(),
            "stream_type:{:?} opened_streams:{}",
            stream_type,
            opened_streams
        );

        let mut frame_buffer = OutgoingFrameBuffer::new();
        let mut write_context = MockWriteContext::new(
            time::now(),
            &mut frame_buffer,
            transmission::Constraint::None,
            transmission::Mode::Normal,
            endpoint::Type::Server,
        );
        let packet_number = write_context.packet_number();
        assert!(manager.on_transmit(&mut write_context).is_ok());

        let expected_frame = Frame::StreamsBlocked(StreamsBlocked {
            stream_type,
            stream_limit: opened_streams,
        });

        assert_eq!(
            expected_frame,
            write_context.frame_buffer.pop_front().unwrap().as_frame()
        );

        assert_eq!(
            transmission::Interest::None,
            manager.get_transmission_interest()
        );

        manager.on_packet_loss(&PacketNumberRange::new(packet_number, packet_number));

        assert_eq!(
            transmission::Interest::LostData,
            manager.get_transmission_interest()
        );

        let packet_number = write_context.packet_number();
        assert!(manager.on_transmit(&mut write_context).is_ok());

        manager.on_packet_ack(&PacketNumberRange::new(packet_number, packet_number));

        assert_eq!(
            transmission::Interest::None,
            manager.get_transmission_interest()
        );

        let expected_transmission_backoff = 4;
        let expected_next_stream_blocked_time =
            write_context.current_time + DEFAULT_SYNC_PERIOD * expected_transmission_backoff;
        assert_eq!(
            Some(expected_next_stream_blocked_time),
            manager.next_expiration()
        );

        manager.on_timeout(expected_next_stream_blocked_time);

        // Another STREAM_BLOCKED frame should be sent
        assert_eq!(
            transmission::Interest::NewData,
            manager.get_transmission_interest()
        );

        // We get more credit from the peer so we should no longer send STREAM_BLOCKED
        assert!(manager
            .on_max_streams(&MaxStreams {
                stream_type,
                maximum_streams: VarInt::from_u32(200),
            })
            .is_ok());

        assert_eq!(
            transmission::Interest::None,
            manager.get_transmission_interest()
        );

        // Close currently open streams to not block on local limits
        for i in 0..*opened_streams {
            let stream_id = StreamId::nth(endpoint::Type::Server, stream_type, i).unwrap();
            manager.with_asserted_stream(stream_id, |stream| stream.interests.retained = false);
        }

        // Clear out the MAX_STREAMS frame
        assert!(manager.on_transmit(&mut write_context).is_ok());
        write_context.frame_buffer.clear();

        // Open streams until blocked
        while manager
            .poll_open_local_stream(
                stream_type,
                &mut token,
                &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
                &Context::from_waker(&waker),
            )
            .is_ready()
        {
            opened_streams += 1;
        }

        // Another STREAM_BLOCKED frame should be sent with the updated MAX_STREAMS value
        assert_eq!(
            transmission::Interest::NewData,
            manager.get_transmission_interest()
        );

        assert!(manager.on_transmit(&mut write_context).is_ok());

        let expected_frame = Frame::StreamsBlocked(StreamsBlocked {
            stream_type,
            stream_limit: VarInt::from_u32(200),
        });

        assert_eq!(
            expected_frame,
            write_context.frame_buffer.pop_front().unwrap().as_frame()
        );
    }
}