fn on_transmit_queries_streams_for_data()

in quic/s2n-quic-transport/src/stream/manager/tests.rs [2428:2653]


fn on_transmit_queries_streams_for_data() {
    fn assert_stream_write_state(
        manager: &mut AbstractStreamManager<MockStream>,
        stream_id: StreamId,
        expected_on_transmit_count: usize,
        expected_on_transmit_try_write: usize,
    ) {
        manager.with_asserted_stream(stream_id, |stream| {
            assert_eq!(expected_on_transmit_count, stream.on_transmit_count);
            assert_eq!(
                expected_on_transmit_try_write,
                stream.on_transmit_try_write_frames
            );
        });
    }

    let mut manager = create_stream_manager(endpoint::Type::Server);
    let mut frame_buffer = OutgoingFrameBuffer::new();

    // Create some open Streams with interests
    let stream_1 = try_open(&mut manager, StreamType::Bidirectional).unwrap();
    let stream_2 = try_open(&mut manager, StreamType::Unidirectional).unwrap();
    let stream_3 = try_open(&mut manager, StreamType::Bidirectional).unwrap();
    let stream_4 = try_open(&mut manager, StreamType::Unidirectional).unwrap();
    let stream_5 = try_open(&mut manager, StreamType::Unidirectional).unwrap();

    manager.with_asserted_stream(stream_1, |stream| {
        stream.on_transmit_try_write_frames = 1;
    });
    manager.with_asserted_stream(stream_3, |stream| {
        stream.on_transmit_try_write_frames = 1;
    });
    manager.with_asserted_stream(stream_4, |stream| {
        stream.on_transmit_try_write_frames = 1;
    });
    manager.with_asserted_stream(stream_5, |stream| {
        stream.on_transmit_try_write_frames = 1;
        stream.lost_data = true;
    });
    assert_eq!(
        [stream_1, stream_3, stream_4],
        *manager.streams_waiting_for_transmission()
    );
    assert_eq!([stream_5], *manager.streams_waiting_for_retransmission());

    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    write_context.transmission_constraint = transmission::Constraint::CongestionLimited;

    assert!(manager.on_transmit(&mut write_context).is_ok());
    assert!(
        write_context.frame_buffer.is_empty(),
        "no frames are written when congestion limited"
    );

    write_context.transmission_constraint = transmission::Constraint::RetransmissionOnly;

    assert!(manager.on_transmit(&mut write_context).is_ok());

    // Only lost data may be written when constrained to retransmission only
    assert_stream_write_state(&mut manager, stream_5, 1, 0);
    assert_eq!(1, write_context.frame_buffer.len());
    assert!(manager.streams_waiting_for_retransmission().is_empty());

    write_context.transmission_constraint = transmission::Constraint::None;

    assert!(manager.on_transmit(&mut write_context).is_ok());

    for stream_id in &[stream_1, stream_3, stream_4] {
        assert_stream_write_state(&mut manager, *stream_id, 1, 0);
    }

    // All 4 streams have written a frame plus there should also be an empty STREAM frame for the open notify
    assert_eq!(5, frame_buffer.len());
    frame_buffer.clear();
    assert!(manager.streams_waiting_for_transmission().is_empty());

    manager.with_asserted_stream(stream_1, |stream| {
        stream.on_transmit_try_write_frames = 10;
    });
    manager.with_asserted_stream(stream_2, |stream| {
        stream.on_transmit_try_write_frames = 10;
    });
    manager.with_asserted_stream(stream_3, |stream| {
        stream.on_transmit_try_write_frames = 10;
    });
    manager.with_asserted_stream(stream_4, |stream| {
        stream.on_transmit_try_write_frames = 10;
    });

    frame_buffer.set_error_write_after_n_frames(15);
    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    assert_eq!(
        [stream_1, stream_2, stream_3, stream_4],
        *manager.streams_waiting_for_transmission()
    );

    assert_eq!(
        Err(OnTransmitError::CouldNotWriteFrame),
        manager.on_transmit(&mut write_context)
    );
    assert_stream_write_state(&mut manager, stream_1, 2, 0);
    assert_stream_write_state(&mut manager, stream_2, 1, 5);
    assert_stream_write_state(&mut manager, stream_3, 1, 10);
    assert_stream_write_state(&mut manager, stream_4, 1, 10);

    assert_eq!(15, frame_buffer.len());
    frame_buffer.clear();
    assert_eq!(
        [stream_2, stream_3, stream_4],
        *manager.streams_waiting_for_transmission()
    );

    // Query stream_1 for data again. It should however be asked at the end
    manager.with_asserted_stream(stream_1, |stream| {
        stream.on_transmit_try_write_frames = 10;
    });
    assert_eq!(
        [stream_2, stream_3, stream_4, stream_1],
        *manager.streams_waiting_for_transmission()
    );

    frame_buffer.set_error_write_after_n_frames(15);
    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    assert_eq!(
        Err(OnTransmitError::CouldNotWriteFrame),
        manager.on_transmit(&mut write_context)
    );
    assert_stream_write_state(&mut manager, stream_1, 2, 10);
    assert_stream_write_state(&mut manager, stream_2, 2, 0);
    assert_stream_write_state(&mut manager, stream_3, 2, 0);
    assert_stream_write_state(&mut manager, stream_4, 2, 10);

    assert_eq!(15, frame_buffer.len());
    frame_buffer.clear();
    assert_eq!(
        [stream_4, stream_1],
        *manager.streams_waiting_for_transmission()
    );

    frame_buffer.set_error_write_after_n_frames(5);
    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    assert_eq!(
        Err(OnTransmitError::CouldNotWriteFrame),
        manager.on_transmit(&mut write_context)
    );
    assert_stream_write_state(&mut manager, stream_1, 2, 10);
    assert_stream_write_state(&mut manager, stream_2, 2, 0);
    assert_stream_write_state(&mut manager, stream_3, 2, 0);
    assert_stream_write_state(&mut manager, stream_4, 3, 5);

    assert_eq!(5, frame_buffer.len());
    frame_buffer.clear();
    assert_eq!(
        [stream_4, stream_1],
        *manager.streams_waiting_for_transmission()
    );

    frame_buffer.set_error_write_after_n_frames(5);
    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    assert_eq!(
        Err(OnTransmitError::CouldNotWriteFrame),
        manager.on_transmit(&mut write_context)
    );
    assert_stream_write_state(&mut manager, stream_1, 3, 10);
    assert_stream_write_state(&mut manager, stream_2, 2, 0);
    assert_stream_write_state(&mut manager, stream_3, 2, 0);
    assert_stream_write_state(&mut manager, stream_4, 4, 0);

    assert_eq!(5, frame_buffer.len());
    frame_buffer.clear();
    assert_eq!([stream_1], *manager.streams_waiting_for_transmission());

    frame_buffer.set_error_write_after_n_frames(11);
    let mut write_context = MockWriteContext::new(
        time::now(),
        &mut frame_buffer,
        transmission::Constraint::None,
        transmission::Mode::Normal,
        endpoint::Type::Server,
    );

    assert_eq!(Ok(()), manager.on_transmit(&mut write_context));
    assert_stream_write_state(&mut manager, stream_1, 4, 0);
    assert_stream_write_state(&mut manager, stream_2, 2, 0);
    assert_stream_write_state(&mut manager, stream_3, 2, 0);
    assert_stream_write_state(&mut manager, stream_4, 4, 0);

    assert_eq!(10, frame_buffer.len());
    frame_buffer.clear();
    assert!(manager.streams_waiting_for_transmission().is_empty());
}