quic/s2n-quic-transport/src/stream/send_stream/tests.rs (2,408 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::stream::{
stream_interests::{StreamInterestProvider, StreamInterests},
testing::*,
StreamError, StreamEvents, StreamTrait,
};
use bytes::Bytes;
use core::task::Poll;
use s2n_codec::DecoderBufferMut;
use s2n_quic_core::{
application::Error as ApplicationErrorCode,
connection, endpoint,
frame::{Frame, MaxData, MaxStreamData, StopSending},
packet::number::PacketNumber,
stream::{ops, StreamType},
transmission,
varint::{VarInt, MAX_VARINT_VALUE},
};
/// Sets up a Test environment for Streams where only the sending half of
/// the Stream is open
fn setup_send_only_test_env() -> TestEnvironment {
let test_env_config = TestEnvironmentConfig {
stream_id: StreamId::initial(endpoint::Type::Server, StreamType::Unidirectional),
..Default::default()
};
setup_stream_test_env_with_config(test_env_config)
}
#[test]
fn remotely_initiated_unidirectional_stream_can_not_be_sent_to() {
for local_endpoint_type in &[endpoint::Type::Client, endpoint::Type::Server] {
let initiator_endpoint_type = if *local_endpoint_type == endpoint::Type::Client {
endpoint::Type::Server
} else {
endpoint::Type::Client
};
let test_env_config = TestEnvironmentConfig {
local_endpoint_type: *local_endpoint_type,
stream_id: StreamId::initial(initiator_endpoint_type, StreamType::Unidirectional),
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let data = Bytes::from_static(b"1");
assert_matches!(
test_env.poll_push(data),
Poll::Ready(Err(StreamError::SendAfterFinish { .. })),
);
}
}
#[test]
fn bidirectional_and_locally_initiated_unidirectional_streams_can_be_written_to() {
for local_endpoint_type in &[endpoint::Type::Client, endpoint::Type::Server] {
for stream_type in &[StreamType::Unidirectional, StreamType::Bidirectional] {
for initiator in &[endpoint::Type::Client, endpoint::Type::Server] {
// Skip remotely initiated unidirectional stream
if *stream_type == StreamType::Unidirectional && *initiator != *local_endpoint_type
{
continue;
}
let test_env_config = TestEnvironmentConfig {
local_endpoint_type: *local_endpoint_type,
stream_id: StreamId::initial(*initiator, *stream_type),
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let data = Bytes::from_static(b"a");
assert_eq!(test_env.poll_push(data), Poll::Ready(Ok(())));
}
}
}
}
#[derive(Debug)]
enum Instruction {
/// Enqueues data via `poll_push()` of the given size at the given offset.
/// The boolean parameter denotes whether this is expected to succeed.
EnqueueData(VarInt, usize, bool),
/// Calls `poll_finish()` on the Stream.
/// The boolean parameter denotes whether this is expected to succeed.
Finish(bool),
/// Initiates a `RESET` with the given error code
Reset(ApplicationErrorCode, bool),
/// Ingest a `MAX_DATA` frame which indicates the given window.
SetMaxData(VarInt),
/// Ingest a `MAX_STREAM_DATA` frame which indicates the given window.
SetMaxStreamData(VarInt, ExpectWakeup),
/// Ingest a `STOP_SENDING` frame
StopSending(ApplicationErrorCode, ExpectWakeup),
/// Checks that a stream data frame with the given parameters had been
/// transmitted.
CheckDataTx(
VarInt, // offset
usize, // size
bool, // eof
bool, // is last frame
PacketNumber,
),
/// Checks that a stream data blocked frame with the given parameters had been
/// transmitted.
CheckStreamDataBlockedTx(
VarInt, // stream data limit
PacketNumber,
),
/// Checks whether a reset frame is transmitted
CheckResetTx(ApplicationErrorCode, PacketNumber, VarInt),
/// Checks whether a Stream is interested in the given interactions
CheckInterests(StreamInterests),
/// Checks that no outgoing data is written in an `on_transmit` call and
/// that no frames are still queued.
CheckNoTx,
/// Acknowledges a packet with a given packet number as received
AckPacket(PacketNumber, ExpectWakeup),
/// Declares a packet with a given packet number as lost
NackPacket(PacketNumber),
}
fn execute_instructions(test_env: &mut TestEnvironment, instructions: &[Instruction]) {
println!("executing {} instructions", instructions.len());
for (id, instruction) in instructions.iter().enumerate() {
println!("Executing instruction {:?} {:?}", id, instruction);
match instruction {
Instruction::EnqueueData(offset, size, expect_success) => {
let data = Bytes::from(gen_pattern_test_data(*offset, *size));
let poll_result = test_env.poll_push(data);
if *expect_success {
assert_eq!(poll_result, Poll::Ready(Ok(())));
} else {
assert_eq!(poll_result, Poll::Pending);
}
}
Instruction::Finish(expect_success) => {
let poll_result = test_env.poll_finish();
if *expect_success {
assert_eq!(poll_result, Poll::Ready(Ok(())));
} else {
assert_eq!(poll_result, Poll::Pending);
}
}
Instruction::Reset(error_code, expect_success) => {
let result = test_env
.stream
.poll_request(ops::Request::default().reset(*error_code), None);
assert_eq!(*expect_success, result.is_ok(), "Unexpected reset result");
}
Instruction::SetMaxData(max_data) => {
let was_waiting_for_connection_window = test_env
.stream
.get_stream_interests()
.connection_flow_control_credits;
test_env.tx_connection_flow_controller.on_max_data(MaxData {
maximum_data: *max_data,
});
if was_waiting_for_connection_window {
test_env.stream.on_connection_window_available();
}
}
Instruction::SetMaxStreamData(max_stream_data, expect_writer_wakeup) => {
let mut events = StreamEvents::new();
assert!(test_env
.stream
.on_max_stream_data(
&MaxStreamData {
stream_id: test_env.stream.stream_id.into(),
maximum_stream_data: *max_stream_data,
},
&mut events
)
.is_ok());
assert!(events.read_wake.is_none());
if let ExpectWakeup(Some(wakeup_expected)) = expect_writer_wakeup {
assert_eq!(*wakeup_expected, events.write_wake.is_some());
}
}
Instruction::StopSending(error_code, expect_writer_wakeup) => {
let mut events = StreamEvents::new();
assert!(test_env
.stream
.on_stop_sending(
&StopSending {
stream_id: test_env.stream.stream_id.into(),
application_error_code: (*error_code).into(),
},
&mut events,
)
.is_ok());
if let ExpectWakeup(Some(wakeup_expected)) = expect_writer_wakeup {
assert_eq!(*wakeup_expected, events.write_wake.is_some());
}
}
Instruction::CheckNoTx => {
test_env.sent_frames.clear();
test_env.assert_write_frames(0);
assert_eq!(
test_env.sent_frames.len(),
0,
"Expected no queued frames {:?}",
&test_env.sent_frames
);
}
Instruction::CheckDataTx(
expected_offset,
expected_size,
expect_eof,
expect_is_last_frame,
expected_packet_number,
) => {
test_env.assert_write_of(
*expected_offset,
*expected_size,
*expect_eof,
*expect_is_last_frame,
*expected_packet_number,
);
}
Instruction::CheckStreamDataBlockedTx(stream_data_limit, expected_packet_number) => {
test_env.assert_write_stream_data_blocked_frame(
*stream_data_limit,
*expected_packet_number,
)
}
Instruction::CheckResetTx(
expected_error_code,
expected_packet_number,
expected_final_size,
) => {
test_env.assert_write_reset_frame(
*expected_error_code,
*expected_packet_number,
*expected_final_size,
);
}
Instruction::CheckInterests(expected_interests) => {
assert_eq!(*expected_interests, test_env.stream.get_stream_interests());
}
Instruction::AckPacket(packet_number, expect_writer_wakeup) => {
test_env.ack_packet(*packet_number, *expect_writer_wakeup);
}
Instruction::NackPacket(packet_number) => {
test_env.nack_packet(*packet_number);
}
}
}
}
#[test]
fn sent_data_gets_enqueued_as_frames() {
let mut test_env = setup_send_only_test_env();
let data1 = Bytes::from_static(b"123");
let data2 = Bytes::from_static(b"456");
let data3 = Bytes::from_static(b"789");
assert_eq!(test_env.poll_push(data1.clone()), Poll::Ready(Ok(())),);
assert_eq!(
stream_interests(&["tx"]),
test_env.stream.get_stream_interests()
);
assert_eq!(test_env.poll_push(data2.clone()), Poll::Ready(Ok(())),);
assert_eq!(
stream_interests(&["tx"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(1);
let mut sent_frame = test_env.sent_frames.pop_front().expect("Frame is written");
assert_eq!(
Frame::Stream(stream_data(
test_env.stream.stream_id,
VarInt::from_u32(0),
DecoderBufferMut::new(
&mut data1
.iter()
.chain(data2.iter())
.copied()
.collect::<Vec<_>>()[..]
),
false
)),
sent_frame.as_frame()
);
assert_eq!(
stream_interests(&["ack"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(0);
assert_eq!(test_env.poll_push(data3.clone()), Poll::Ready(Ok(())),);
assert_eq!(
stream_interests(&["ack", "tx"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(1);
let mut sent_frame = test_env.sent_frames.pop_front().expect("Frame is written");
assert_eq!(
Frame::Stream(stream_data(
test_env.stream.stream_id,
VarInt::from_u32((data1.len() + data2.len()) as u32),
DecoderBufferMut::new(&mut data3.to_vec()[..]),
false
)),
sent_frame.as_frame()
);
assert_eq!(
stream_interests(&["ack"]),
test_env.stream.get_stream_interests()
);
}
#[test]
fn can_not_enqueue_data_if_max_buffer_size_has_been_reached() {
const MAX_BUFFER_SIZE: usize = 1024;
let max_buffer_size_varint = VarInt::from_u32(MAX_BUFFER_SIZE as u32);
const RECEIVE_WINDOW: u64 = 8 * 1024;
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
// Transmitting data does not increase the window
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), MAX_BUFFER_SIZE, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
][..],
&[
// We can actually write more data then the maximum buffer size.
// We can just not write beyond this.
Instruction::EnqueueData(VarInt::from_u32(0), 128, true),
Instruction::EnqueueData(VarInt::from_u32(128), MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
// Transmitting data does not increase the window
Instruction::CheckDataTx(VarInt::from_u32(0), 1152, false, false, pn(0)),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE - 200, true),
Instruction::EnqueueData(max_buffer_size_varint - 200, 150, true),
Instruction::EnqueueData(max_buffer_size_varint - 50, 50, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
][..],
&[
// Checks that we can enqueue more than the maximum buffer size,
// as long as we are below the flow control window. We subtract 2
// in the second instruction so that even the the third instruction
// would not cause us a rejection based on the flow control size.
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE - 1, true),
Instruction::EnqueueData(
max_buffer_size_varint - 1,
RECEIVE_WINDOW as usize - MAX_BUFFER_SIZE - 2,
true,
),
Instruction::EnqueueData(VarInt::from_u32(RECEIVE_WINDOW as u32) - 1, 1, false),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), MAX_BUFFER_SIZE, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
// Now we should have MAX_BUFFER_SIZE available again
Instruction::EnqueueData(max_buffer_size_varint, 512, true),
Instruction::EnqueueData(max_buffer_size_varint + 512, 2 * MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Transmit and ACK data. There should still be no buffer space
Instruction::CheckDataTx(max_buffer_size_varint, 1196, false, true, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// After acknowledging the big chunk, we have capacity again
Instruction::CheckDataTx(max_buffer_size_varint + 1196, 1196, false, true, pn(2)),
Instruction::CheckDataTx(max_buffer_size_varint + 1196 * 2, 168, false, false, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(true))),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: RECEIVE_WINDOW,
max_packet_size: Some(1200),
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn zero_sized_buffers_can_always_be_enqueued() {
const MAX_BUFFER_SIZE: usize = 1024;
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), 2 * MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(
VarInt::from_u32(2 * MAX_BUFFER_SIZE as u32),
2 * MAX_BUFFER_SIZE,
false,
),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(VarInt::from_u32(2 * MAX_BUFFER_SIZE as u32), 0, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), 0, true),
Instruction::CheckInterests(stream_interests(&[])),
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn multiple_stream_frames_are_sent_in_a_packet() {
const MAX_BUFFER_SIZE: usize = 1024;
let max_buffer_size_varint = VarInt::from_u32(MAX_BUFFER_SIZE as u32);
const RECEIVE_WINDOW: u64 = 8 * 1024;
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Transmitting data does not increase the window
Instruction::CheckDataTx(VarInt::from_u32(0), MAX_BUFFER_SIZE, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckInterests(stream_interests(&["ack"])),
][..],
&[
// We can actually write more data then the maximum buffer size.
// We can just not write beyond this.
Instruction::EnqueueData(VarInt::from_u32(0), 128, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(VarInt::from_u32(128), MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
// Transmitting data does not increase the window
Instruction::CheckDataTx(VarInt::from_u32(0), 1152, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckInterests(stream_interests(&["ack"])),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE - 200, true),
Instruction::EnqueueData(max_buffer_size_varint - 200, 150, true),
Instruction::EnqueueData(max_buffer_size_varint - 50, 50, true),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
][..],
&[
// Checks that we can enqueue more than the maximum buffer size,
// as long as we are below the flow control window. We subtract 2
// in the second instruction so that even the the third instruction
// would not cause us a rejection based on the flow control size.
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE - 1, true),
Instruction::EnqueueData(
max_buffer_size_varint - 1,
RECEIVE_WINDOW as usize - MAX_BUFFER_SIZE - 2,
true,
),
Instruction::EnqueueData(VarInt::from_u32(RECEIVE_WINDOW as u32) - 1, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), MAX_BUFFER_SIZE, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(max_buffer_size_varint, 1, false),
Instruction::CheckDataTx(VarInt::from_u32(0), MAX_BUFFER_SIZE, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
// Now we should have MAX_BUFFER_SIZE available again
Instruction::EnqueueData(max_buffer_size_varint, 512, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(max_buffer_size_varint + 512, 2 * MAX_BUFFER_SIZE, true),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, false),
// Transmit and ACK data. There should still be no buffer space
Instruction::CheckDataTx(max_buffer_size_varint, 1196, false, true, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// After acknowledging the big chunk, we have capacity again
Instruction::CheckDataTx(max_buffer_size_varint + 1196, 1196, false, true, pn(2)),
Instruction::CheckDataTx(max_buffer_size_varint + 1196 * 2, 168, false, false, pn(3)),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(true))),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::EnqueueData(max_buffer_size_varint * 3 + 512, 1, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: RECEIVE_WINDOW,
max_packet_size: Some(1200),
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn bigger_data_is_split_across_packets() {
const MAX_PACKET_SIZE: usize = 1024;
const TOTAL_WRITE_SIZE: usize = 4 * MAX_PACKET_SIZE + 31;
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), TOTAL_WRITE_SIZE, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 1022, false, true, pn(0)),
// The next frames need a 2 byte stream ID + 2 byte offset
Instruction::CheckDataTx(VarInt::from_u32(1022), 1020, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(2042), 1020, false, true, pn(2)),
Instruction::CheckDataTx(VarInt::from_u32(3062), 1020, false, true, pn(3)),
Instruction::CheckDataTx(
VarInt::from_u32(4082),
TOTAL_WRITE_SIZE - 4082,
false,
false,
pn(4),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
][..],
&[
Instruction::EnqueueData(VarInt::from_u32(0), 512, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(VarInt::from_u32(512), TOTAL_WRITE_SIZE, true),
Instruction::CheckDataTx(VarInt::from_u32(0), 1022, false, true, pn(0)),
// The next frames need a 2 byte stream ID + 2 byte offset
Instruction::CheckDataTx(VarInt::from_u32(1022), 1020, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(2042), 1020, false, true, pn(2)),
Instruction::CheckDataTx(VarInt::from_u32(3062), 1020, false, true, pn(3)),
Instruction::CheckDataTx(
VarInt::from_u32(4082),
512 + TOTAL_WRITE_SIZE - 4082,
false,
false,
pn(4),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 16 * 1024,
initial_send_window: 16 * 1024,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn lost_data_is_retransmitted() {
const MAX_PACKET_SIZE: usize = 1000;
let test_configs = &[
&[
// These go all into the first packet, which will be OK
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(VarInt::from_u32(500), 400, true),
Instruction::CheckDataTx(VarInt::from_u32(0), 900, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// These go into the next packet, which will get lost
Instruction::EnqueueData(VarInt::from_u32(900), 700, true),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::EnqueueData(VarInt::from_u32(1600), 200, true),
Instruction::CheckDataTx(VarInt::from_u32(900), 900, false, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// The next packet will be good again
Instruction::EnqueueData(VarInt::from_u32(1800), 900, true),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(1800), 900, false, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::CheckNoTx,
// Declare the packet as lost.
// We now expect a retransmission of those segments
Instruction::NackPacket(pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(900), 900, false, false, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
][..],
&[
// Split initial range across various packets, in or to check that
// only required ranges are retransmitted.
// Requires 504 bytes
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Requires 406 bytes. Remaining: 90
Instruction::EnqueueData(VarInt::from_u32(500), 400, true),
// 86 bytes can be transmitted in the first packet. 2214 left
// 996 bytes can be transmitted in the second packet. 1218 left
// 996 bytes can be transmitted in the third packet. 222 left
// 228 bytes required in the forth packet. 772 remaining for other frames
Instruction::EnqueueData(VarInt::from_u32(900), 2300, true),
// 768 bytes transmitted in the forth packet. 232 left
// 238 bytes required in the fifth packet.
Instruction::EnqueueData(VarInt::from_u32(3200), 1000, true),
// 106 bytes required in the fifth packet.
Instruction::EnqueueData(VarInt::from_u32(4200), 100, true),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 996, false, true, pn(2)),
Instruction::CheckDataTx(VarInt::from_u32(2990), 996, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(3986), 314, false, false, pn(4)),
// Packet 1 and Packet 3 get lost
Instruction::NackPacket(pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::NackPacket(pn(3)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(5)),
Instruction::CheckDataTx(VarInt::from_u32(2990), 996, false, true, pn(6)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(6), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::AckPacket(pn(6), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 10 * MAX_PACKET_SIZE,
initial_send_window: 10 * MAX_PACKET_SIZE as u64,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn can_not_transmit_data_when_congestion_limited() {
const MAX_PACKET_SIZE: usize = 1000;
let test_config = &[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Verify data is not transmitted
Instruction::CheckNoTx,
Instruction::Reset(ApplicationErrorCode::new(1).unwrap(), true),
// Verify reset is not synced
Instruction::CheckNoTx,
];
let test_env_config = TestEnvironmentConfig {
transmission_constraint: transmission::Constraint::CongestionLimited,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, test_config);
}
#[test]
fn only_lost_data_is_sent_when_constrained_to_retransmission_only() {
const MAX_PACKET_SIZE: usize = 1000;
let test_config = &[
// Send 800 bytes in a packet that will be lost
Instruction::EnqueueData(VarInt::from_u32(0), 800, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 800, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["lost"])),
];
let test_env_config: TestEnvironmentConfig = Default::default();
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, test_config);
let test_config = &[
// Enqueue 900 bytes of new data
Instruction::EnqueueData(VarInt::from_u32(0), 900, true),
Instruction::CheckInterests(stream_interests(&["tx", "lost"])),
// Verify that only the lost data was sent
Instruction::CheckDataTx(VarInt::from_u32(0), 800, false, false, pn(1)),
Instruction::CheckNoTx,
// Verify we still want to transmit the new data
Instruction::CheckInterests(stream_interests(&["tx", "ack"])),
];
test_env.transmission_constraint = transmission::Constraint::RetransmissionOnly;
execute_instructions(&mut test_env, test_config);
}
#[test]
fn only_lost_reset_is_sent_when_constrained_to_retransmission_only() {
const MAX_PACKET_SIZE: usize = 1000;
let error_code = ApplicationErrorCode::new(1).unwrap();
let test_config = &[
// Send a reset that will be lost
Instruction::Reset(error_code, true),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::NackPacket(pn(0)),
];
let test_env_config: TestEnvironmentConfig = Default::default();
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, test_config);
let test_config = &[
// Verify the lost reset was sent
Instruction::CheckResetTx(error_code, pn(1), VarInt::from_u32(0)),
// Verify a new reset cannot be sent
Instruction::Reset(error_code, true),
Instruction::CheckNoTx,
];
test_env.transmission_constraint = transmission::Constraint::RetransmissionOnly;
execute_instructions(&mut test_env, test_config);
}
#[test]
fn retransmitted_data_is_sent_in_same_packets_as_new_data() {
const MAX_PACKET_SIZE: usize = 1000;
let test_configs = &[
&[
// Enqueue data which gets sent in one packet, but doesn't take up
// all it's space.
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// Prepare more data
Instruction::EnqueueData(VarInt::from_u32(500), 1000, true),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["lost"])),
// Transmit after packet loss had been announced.
// The retransmit requires 504 bytes. 496 are left. 492 can be
// used for content
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(500), 492, false, true, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
][..],
&[
// Enqueue data which gets sent in one packet, but doesn't take up
// all it's space. In the retransmission the space will be used for
// other data.
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// Send 2 chunks. One which will span multiple frames
Instruction::EnqueueData(VarInt::from_u32(500), 500, true),
Instruction::EnqueueData(VarInt::from_u32(1000), 1000, true),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
// Requires 506 bytes
Instruction::CheckDataTx(VarInt::from_u32(500), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1496), 504, false, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::NackPacket(pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::EnqueueData(VarInt::from_u32(2000), 10, true),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
// Retransmission of the first chunk requires 504 bytes. 496 left
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(1496), 492, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(1988), 12, false, false, pn(4)),
Instruction::CheckDataTx(VarInt::from_u32(2000), 10, false, false, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 10 * MAX_PACKET_SIZE,
initial_send_window: 10 * MAX_PACKET_SIZE as u64,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-4.1
//= type=test
//# A sender SHOULD send a
//# STREAM_DATA_BLOCKED or DATA_BLOCKED frame to indicate to the receiver
//# that it has data to write but is blocked by flow control limits.
//= https://www.rfc-editor.org/rfc/rfc9000#section-19.13
//= type=test
//# A sender SHOULD send a STREAM_DATA_BLOCKED frame (type=0x15) when it
//# wishes to send data, but is unable to do so due to stream-level flow
//# control.
#[test]
fn writes_not_more_than_max_stream_data_even_if_more_data_is_enqueued() {
const MAX_PACKET_SIZE: usize = 1000;
const MAX_BUFFER_SIZE: usize = 1500;
const WINDOW_SIZE: usize = 2000;
let test_configs = &[&[
// We are able to enqueue data which is bigger than the window
Instruction::EnqueueData(VarInt::from_u32(0), 2500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Try to enqueue some data so we can observe wakeups
Instruction::EnqueueData(VarInt::from_u32(2500), 10, false),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2000), pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
// Acking the packets does not unblock sending more data
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
// Sending a window update should unblock sending more data.
// Since we still have enqueued more data than the new flow control window,
// we expect no wakeup here.
Instruction::SetMaxStreamData(VarInt::from_u32(2100), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 100, false, false, pn(3)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2100), pn(3)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
Instruction::SetMaxStreamData(VarInt::from_u32(2499), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2100), 399, false, false, pn(4)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2499), pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
Instruction::SetMaxStreamData(VarInt::from_u32(2499), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
Instruction::SetMaxStreamData(VarInt::from_u32(2000), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
Instruction::SetMaxStreamData(VarInt::from_u32(2500), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2499), 1, false, false, pn(5)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
Instruction::SetMaxStreamData(VarInt::from_u32(2501), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
][..]];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: WINDOW_SIZE as u64,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn blocked_on_stream_flow_control_does_not_prevent_retransmissions() {
const MAX_PACKET_SIZE: usize = 1000;
// Max buffer is lower than WINDOW, in order to get blocked before
// we reach the window
const MAX_BUFFER_SIZE: usize = 1500;
const WINDOW_SIZE: usize = 2000;
let test_configs = &[&[
// We are able to enqueue data which is bigger than the window
Instruction::EnqueueData(VarInt::from_u32(0), 3000, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Check that we indeed can't enqueue more data
Instruction::EnqueueData(VarInt::from_u32(2500), 10, false),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2000), pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
// Nack packets. This should lead to retransmissions
Instruction::NackPacket(pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::NackPacket(pn(2)),
Instruction::CheckInterests(stream_interests(&["lost"])),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(5)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2000), pn(5)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::CheckNoTx,
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::CheckNoTx,
][..]];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: WINDOW_SIZE as u64,
max_packet_size: Some(MAX_PACKET_SIZE),
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn writes_not_more_than_max_data_even_if_more_data_is_enqueued() {
const MAX_PACKET_SIZE: usize = 1000;
const MAX_BUFFER_SIZE: usize = 1500;
const STREAM_WINDOW_SIZE: usize = 100 * 1024; // Do not want to block on this
const CONN_WINDOW_SIZE: usize = 2000;
let test_configs = &[&[
// We are able to enqueue data which is bigger than the window
Instruction::EnqueueData(VarInt::from_u32(0), 2500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Try to enqueue some data so we can observe wakeups
Instruction::EnqueueData(VarInt::from_u32(2500), 10, false),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
// Acking the packets does not unblock sending more data
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
// Sending a window update should unblock sending more data.
// Since we still have enqueued more data than the new flow control window,
// we expect no wakeup here.
Instruction::SetMaxData(VarInt::from_u32(2100)),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 100, false, false, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
Instruction::SetMaxData(VarInt::from_u32(2499)),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2100), 399, false, false, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
//= https://www.rfc-editor.org/rfc/rfc9000#section-4.1
//= type=test
//# A sender MUST ignore any MAX_STREAM_DATA or MAX_DATA frames that do
//# not increase flow control limits.
Instruction::SetMaxData(VarInt::from_u32(2499)),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
//= https://www.rfc-editor.org/rfc/rfc9000#section-4.1
//= type=test
//# A sender MUST ignore any MAX_STREAM_DATA or MAX_DATA frames that do
//# not increase flow control limits.
Instruction::SetMaxData(VarInt::from_u32(2000)),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
Instruction::SetMaxData(VarInt::from_u32(2500)),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2499), 1, false, false, pn(5)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
Instruction::SetMaxData(VarInt::from_u32(2501)),
Instruction::CheckInterests(stream_interests(&[])),
][..]];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: STREAM_WINDOW_SIZE as u64,
initial_connection_send_window_size: CONN_WINDOW_SIZE as u64,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn blocked_on_connection_flow_control_does_not_prevent_retransmissions() {
const MAX_PACKET_SIZE: usize = 1000;
// Max buffer is lower than WINDOW, in order to get blocked before
// we reach the window
const MAX_BUFFER_SIZE: usize = 1500;
const STREAM_WINDOW_SIZE: usize = 100 * 1024; // Do not want to block on this
const CONN_WINDOW_SIZE: usize = 2000;
let test_configs = &[&[
// We are able to enqueue data which is bigger than the window
Instruction::EnqueueData(VarInt::from_u32(0), 2500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Check that we indeed can't enqueue more data
Instruction::EnqueueData(VarInt::from_u32(2500), 10, false),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
// Nack packets. This should lead to retransmissions
Instruction::NackPacket(pn(1)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::NackPacket(pn(2)),
Instruction::CheckInterests(stream_interests(&["lost"])),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(5)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::CheckNoTx,
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::CheckNoTx,
][..]];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: MAX_BUFFER_SIZE,
initial_send_window: STREAM_WINDOW_SIZE as u64,
initial_connection_send_window_size: CONN_WINDOW_SIZE as u64,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env
.sent_frames
.set_max_packet_size(Some(MAX_PACKET_SIZE));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn can_write_up_to_max_stream_size() {
// This is a bit hacky. We can obviously not write 2**62 bytes of data in
// the test. Therefore we resort to manipulating some internal pointers
let max_varint = VarInt::new(MAX_VARINT_VALUE).unwrap();
let mut test_env = setup_send_only_test_env();
test_env
.stream
.send_stream
.data_sender
.set_total_acknowledged_len(max_varint - 1);
test_env
.stream
.send_stream
.data_sender
.flow_controller_mut()
.set_max_stream_data(max_varint);
assert_eq!(
Poll::Ready(Ok(())),
test_env.poll_push(Bytes::from_static(b"a"))
);
}
#[test]
fn can_not_write_more_data_than_maximum_stream_size() {
// This is a bit hacky. We can obviously not write 2**62 bytes of data in
// the test. Therefore we resort to manipulating some internal pointers
let max_varint = VarInt::new(MAX_VARINT_VALUE).unwrap();
// This part checks what happens if we go over the limit with one big chunk
let mut test_env = setup_send_only_test_env();
test_env
.stream
.send_stream
.data_sender
.set_total_acknowledged_len(max_varint - 1);
test_env
.stream
.send_stream
.data_sender
.flow_controller_mut()
.set_max_stream_data(max_varint);
assert_matches!(
test_env.poll_push(Bytes::from_static(b"aa")),
Poll::Ready(Err(StreamError::MaxStreamDataSizeExceeded { .. })),
);
// And this part checks what happens if are exactly at the limit before
let mut test_env = setup_send_only_test_env();
test_env
.stream
.send_stream
.data_sender
.set_total_acknowledged_len(max_varint - 1);
test_env
.stream
.send_stream
.data_sender
.flow_controller_mut()
.set_max_stream_data(max_varint);
assert_eq!(
Poll::Ready(Ok(())),
test_env.poll_push(Bytes::from_static(b"a"))
);
assert_matches!(
test_env.poll_push(Bytes::from_static(b"a")),
Poll::Ready(Err(StreamError::MaxStreamDataSizeExceeded { .. })),
);
}
#[test]
fn push_data_after_stream_is_reset_locally() {
for reset_is_acknowledged in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(5).unwrap();
// Call finish and expect the frame to get emitted
execute_instructions(
&mut test_env,
&[
Instruction::Reset(error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
if *reset_is_acknowledged {
execute_instructions(
&mut test_env,
&[
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["fin"])),
],
);
}
assert_matches!(
test_env.poll_push(Bytes::from_static(b"1")),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
}
#[test]
fn push_data_after_finish_was_called() {
let mut test_env = setup_send_only_test_env();
assert_eq!(Poll::Pending, test_env.poll_finish());
assert_matches!(
test_env.poll_push(Bytes::from_static(b"1")),
Poll::Ready(Err(StreamError::SendAfterFinish { .. })),
);
}
#[test]
fn push_data_after_stream_is_reset_due_to_stop_sending() {
for acknowledge_reset_early in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(error_code, ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
if *acknowledge_reset_early {
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
assert_eq!(
stream_interests(&[]),
test_env.stream.get_stream_interests()
);
}
// Poll two times.
// The first call checks the branch where the user is not yet aware
// about the reset.
assert_matches!(
test_env.poll_push(Bytes::from_static(b"1")),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
if !*acknowledge_reset_early {
assert_eq!(
stream_interests(&["ack"]),
test_env.stream.get_stream_interests()
);
// The user is already aware about the reset.
// Delivering an ack now should bring us into the final state
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
}
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
// The second call checks whether the same result is delivered after the
// user had been notified.
assert_matches!(
test_env.poll_push(Bytes::from_static(b"1")),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}
}
#[test]
fn writes_finish_packet_in_dedicated_packet() {
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(500), 0, true, false, pn(1)),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
// Acknowledge other packet(s), but not the FIN.
// That should not trigger a wakeup and finish to return Ready.
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..],
&[
// use a dedicated packet, but acknowledge everything in front of it
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(500), 0, true, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..],
&[
// Acknowledge the FIN packet, but leave the other packet not acknowledged
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(500), 0, true, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..],
];
for test_config in test_configs.iter() {
let mut test_env = setup_send_only_test_env();
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn adds_finish_packet_in_enqueued_packet() {
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..],
&[
// Lets mark the initial packet as lost, and piggyback the FIN
// on the retransmit
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["lost"])),
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..],
];
for test_config in test_configs.iter() {
let mut test_env = setup_send_only_test_env();
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn finish_without_data() {
let test_configs = &[&[
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 0, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..]];
for test_config in test_configs.iter() {
let mut test_env = setup_send_only_test_env();
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn finish_after_stream_is_finished() {
let test_configs = &[&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
][..]];
for test_config in test_configs.iter() {
let mut test_env = setup_send_only_test_env();
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn transmit_fin_in_packet_which_gets_split() {
let test_configs = &[&[
// 1500 bytes do not fit into a single packet. That means even if
// the FIN has been initially merged onto this packet, it needs to
// get sent later.
Instruction::EnqueueData(VarInt::from_u32(0), 1500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 502, true, false, pn(1)),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::CheckNoTx,
][..]];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
initial_send_window: 2000,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env.sent_frames.set_max_packet_size(Some(1000));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn retransmit_fin_if_lost() {
let test_configs = &[
&[
// Standalone FIN
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(0), 0, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["lost"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 0, true, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
&[
// Piggybacked FIN
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["lost"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(1)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
&[
// FIN gets combined gets with another lost packet
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::EnqueueData(VarInt::from_u32(500), 250, true),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(500), 250, false, false, pn(1)),
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(750), 0, true, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::NackPacket(pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "lost"])),
Instruction::NackPacket(pn(2)),
Instruction::CheckInterests(stream_interests(&["lost"])),
// The retransmitted chunks should all end up in one packet
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(750), 0, true, false, pn(3)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::CheckNoTx,
Instruction::AckPacket(pn(3), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
];
for test_config in test_configs.iter() {
let mut test_env = setup_send_only_test_env();
test_env.sent_frames.set_max_packet_size(Some(1000));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn transmit_fin_while_outstanding_data_exceeds_stream_flow_control_window() {
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), 4000, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Send 3 initial packets, then the flow control window limits us
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2000), pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::SetMaxStreamData(VarInt::from_u32(3000), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 996, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(2996), 4, false, false, pn(4)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(3000), pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
// Flow control window is now big enough to get all data transmitted
Instruction::SetMaxStreamData(VarInt::from_u32(4000), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(3000), 996, false, true, pn(5)),
Instruction::CheckDataTx(VarInt::from_u32(3996), 4, true, false, pn(6)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(6), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
&[
// This is a second iteration of the test, where the window update
// is bigger (not equal) to the queued data.
Instruction::EnqueueData(VarInt::from_u32(0), 3000, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Send 3 initial packets, then the flow control window limits us
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckStreamDataBlockedTx(VarInt::from_u32(2000), pn(2)),
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&["ack", "sf"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["sf"])),
Instruction::SetMaxStreamData(VarInt::from_u32(3001), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 996, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(2996), 4, true, false, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
initial_send_window: 2000,
stream_id: StreamId::initial(endpoint::Type::Server, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Server,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env.sent_frames.set_max_packet_size(Some(1000));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn transmit_fin_while_outstanding_data_exceeds_connection_flow_control_window() {
let test_configs = &[
&[
Instruction::EnqueueData(VarInt::from_u32(0), 4000, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Send 3 initial packets, then the flow control window limits us
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::SetMaxData(VarInt::from_u32(3000)),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 996, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(2996), 4, false, false, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
// Flow control window is now big enough to get all data transmitted
Instruction::SetMaxData(VarInt::from_u32(4000)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(3000), 996, false, true, pn(5)),
Instruction::CheckDataTx(VarInt::from_u32(3996), 4, true, false, pn(6)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(5), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(6), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
&[
// This is a second iteration of the test, where the window update
// is bigger (not equal) to the queued data.
Instruction::EnqueueData(VarInt::from_u32(0), 3000, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
// Send 3 initial packets, then the flow control window limits us
Instruction::CheckDataTx(VarInt::from_u32(0), 998, false, true, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::CheckDataTx(VarInt::from_u32(998), 996, false, true, pn(1)),
Instruction::CheckDataTx(VarInt::from_u32(1994), 6, false, false, pn(2)),
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::AckPacket(pn(2), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["cf"])),
Instruction::SetMaxData(VarInt::from_u32(3001)),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(2000), 996, false, true, pn(3)),
Instruction::CheckDataTx(VarInt::from_u32(2996), 4, true, false, pn(4)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(3), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(4), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
][..],
];
for test_config in test_configs.iter() {
let test_env_config = TestEnvironmentConfig {
initial_send_window: 200 * 1024,
initial_connection_send_window_size: 2000,
stream_id: StreamId::initial(endpoint::Type::Server, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Server,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
test_env.sent_frames.set_max_packet_size(Some(1000));
execute_instructions(&mut test_env, &test_config[..]);
}
}
#[test]
fn finish_after_stream_is_reset_locally() {
for acknowledge_reset_early in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(5).unwrap();
assert!(test_env.reset(error_code).is_ok());
assert_eq!(
stream_interests(&["tx"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_reset_frame(error_code, pn(0), VarInt::from_u32(0));
assert_eq!(
stream_interests(&["ack"]),
test_env.stream.get_stream_interests()
);
if *acknowledge_reset_early {
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
if !*acknowledge_reset_early {
// We are done after the ack, because the user is already aware
// about the reset
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
}
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}
}
#[test]
fn finish_after_stream_is_reset_due_to_stop_sending() {
for acknowledge_reset_early in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(error_code, ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
if *acknowledge_reset_early {
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
assert_eq!(
stream_interests(&[]),
test_env.stream.get_stream_interests()
);
}
// Poll two times.
// The first call checks the branch where the user is not yet aware
// about the reset.
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
// Now that the user is aware the outstanding acknowledge can finalize
// the stream
if !*acknowledge_reset_early {
assert_eq!(
stream_interests(&["ack"]),
test_env.stream.get_stream_interests()
);
test_env.ack_packet(pn(0), ExpectWakeup(Some(false)));
}
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
// The second call checks whether the same result is delivered after the
// user had been notified.
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}
}
#[test]
fn stop_sending_while_waiting_for_fin_to_get_acknowledged_leads_to_a_reset() {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
execute_instructions(
&mut test_env,
&[
// Call finish and expect the frame to get emitted
Instruction::Finish(false),
Instruction::CheckDataTx(VarInt::from_u32(0), 0, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// Inject stop sending. This should lead to a reset frame,
// and to no longer wait for an acknowledge on the FIN.
// It should thereby also wake up the waiter
Instruction::StopSending(error_code, ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(1), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
// This is the ACK for the FIN, for which we no longer care
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
execute_instructions(
&mut test_env,
&[
// This is the ACK for the Reset
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["fin"])),
],
);
}
#[test]
fn stop_sending_while_sending_data_leads_to_a_reset() {
for require_wakeup in &[true, false] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Server, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Server,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
],
);
let transmitted_data = 500;
if *require_wakeup {
// Enqueue enough data that the stream is blocked
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(500), 2000, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::EnqueueData(VarInt::from_u32(2500), 2000, false),
],
);
}
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
execute_instructions(
&mut test_env,
&[
// Inject stop sending. This should lead to a reset frame
Instruction::StopSending(error_code, ExpectWakeup(Some(*require_wakeup))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(1), VarInt::from_u32(transmitted_data)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&[])),
],
);
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
#[test]
fn stop_sending_does_not_cause_an_action_if_stream_is_already_reset() {
for reset_is_acknowledged in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
let stop_sending_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
// Call finish and expect the frame to get emitted
execute_instructions(
&mut test_env,
&[
Instruction::Reset(error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
if *reset_is_acknowledged {
execute_instructions(
&mut test_env,
&[
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["fin"])),
],
);
}
// Inject stop sending. This should have no impact
execute_instructions(
&mut test_env,
&[Instruction::StopSending(
stop_sending_error_code,
ExpectWakeup(Some(false)),
)],
);
let expected_interest = if *reset_is_acknowledged {
stream_interests(&["fin"])
} else {
stream_interests(&["ack"])
};
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(expected_interest),
Instruction::CheckNoTx,
],
);
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
execute_instructions(
&mut test_env,
&[
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
],
);
}
}
#[test]
fn stop_sending_does_not_cause_an_action_if_stream_is_finished_and_acknowledged() {
for check_finish_flag_later in &[true, false] {
let mut test_env = setup_send_only_test_env();
let stop_sending_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
],
);
let expected_interests = if !*check_finish_flag_later {
execute_instructions(&mut test_env, &[Instruction::Finish(true)]);
stream_interests(&["fin"])
} else {
stream_interests(&[])
};
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(stop_sending_error_code, ExpectWakeup(Some(false))),
Instruction::CheckInterests(expected_interests),
Instruction::CheckNoTx,
],
);
if *check_finish_flag_later {
execute_instructions(
&mut test_env,
&[
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
],
);
}
}
}
#[test]
fn reset_does_not_cause_an_action_if_stream_is_already_reset() {
for is_internal_reset in &[true, false] {
for acknowledge_reset in &[true, false] {
let mut test_env = setup_send_only_test_env();
let error_code = ApplicationErrorCode::new(0x1234_5678).unwrap();
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
// Perform the initial reset
execute_instructions(
&mut test_env,
&[
Instruction::Reset(error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u32(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
if *acknowledge_reset {
execute_instructions(
&mut test_env,
&[
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(stream_interests(&["fin"])),
],
);
}
if *is_internal_reset {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_none());
} else {
execute_instructions(&mut test_env, &[Instruction::Reset(reset_error_code, true)]);
}
// The reset should not lead to an outgoing packet
assert!(test_env
.stream
.get_stream_interests()
.transmission
.is_none());
execute_instructions(&mut test_env, &[Instruction::CheckNoTx]);
// Accessing the stream should lead to the original reset error
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
}
}
#[test]
fn reset_does_not_cause_an_action_if_stream_is_finished_and_acknowledged() {
for is_internal_reset in &[true, false] {
for check_finish_flag_early in &[true, false] {
let mut test_env = setup_send_only_test_env();
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&[])),
],
);
if *check_finish_flag_early {
execute_instructions(
&mut test_env,
&[
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
],
);
}
if *is_internal_reset {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_none());
} else {
execute_instructions(&mut test_env, &[Instruction::Reset(reset_error_code, true)]);
}
// The reset should not lead to an outgoing packet
assert!(test_env
.stream
.get_stream_interests()
.transmission
.is_none());
execute_instructions(&mut test_env, &[Instruction::CheckNoTx]);
// Accessing the stream should still return the finished state
execute_instructions(
&mut test_env,
&[
Instruction::Finish(true),
Instruction::CheckInterests(stream_interests(&["fin"])),
Instruction::CheckNoTx,
],
);
}
}
}
#[test]
fn resetting_the_stream_does_does_trigger_a_reset_frame_and_reset_errors() {
#[derive(Copy, Clone, Debug, PartialEq)]
enum ResetReason {
StopSending,
ApplicationReset,
InternalReset,
}
for reset_reason in &[
ResetReason::StopSending,
ResetReason::ApplicationReset,
ResetReason::InternalReset,
] {
for is_finishing in &[true, false] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
let transmitted_data = if *is_finishing {
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
500
} else {
execute_instructions(
&mut test_env,
&[
// Queue enough data to that a write gets blocked
Instruction::EnqueueData(VarInt::from_u32(0), 2000, true),
Instruction::EnqueueData(VarInt::from_u32(2000), 2000, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 2000, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
2000
};
match *reset_reason {
ResetReason::InternalReset => {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_some());
// No RESET frame should be transitted due to an internal reset
execute_instructions(
&mut test_env,
&[
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&[])),
],
);
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::ConnectionError {
error: connection::Error::Unspecified { .. },
..
})),
);
}
ResetReason::ApplicationReset => {
// The reset should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::Reset(reset_error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
// Accessing the stream should lead to the reset error
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
ResetReason::StopSending => {
// Stop sending should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(reset_error_code, ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
// Accessing the stream should lead to the reset error
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
};
if *reset_reason != ResetReason::InternalReset {
// If the Reset was not caused internally, it needs to get
// acknowledged before finalization
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
],
);
}
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
}
#[test]
fn stream_does_not_try_to_acquire_connection_flow_control_credits_after_reset() {
#[derive(Copy, Clone, Debug, PartialEq)]
enum ResetReason {
StopSending,
ApplicationReset,
InternalReset,
}
for reset_reason in &[
ResetReason::StopSending,
ResetReason::ApplicationReset,
ResetReason::InternalReset,
] {
for is_finishing in &[true, false] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1500,
initial_connection_send_window_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
// Enqueue data and get blocked on the flow control window
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 2000, true),
// Try to check whether we got woken up during reset
Instruction::EnqueueData(VarInt::from_u32(2000), 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 1000, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
],
);
let transmitted_data = 1000;
if *is_finishing {
execute_instructions(
&mut test_env,
&[
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
],
);
}
match *reset_reason {
ResetReason::InternalReset => {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_some());
// No RESET frame should be transitted due to an internal reset
execute_instructions(
&mut test_env,
&[
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&[])),
],
);
}
ResetReason::ApplicationReset => {
// The reset should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::Reset(reset_error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
}
ResetReason::StopSending => {
// Stop sending should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(reset_error_code, ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
}
};
// Increasing the flow control window should not change readiness.
// It should also not lead the Stream to consume from our flow control
// window - even if we query it to do so.
assert_eq!(
VarInt::from_u32(0),
test_env.tx_connection_flow_controller.available_window()
);
let previous_readiness = test_env.stream.get_stream_interests();
test_env.tx_connection_flow_controller.on_max_data(MaxData {
maximum_data: VarInt::from_u32(2500),
});
assert_eq!(
VarInt::from_u32(1500),
test_env.tx_connection_flow_controller.available_window()
);
test_env.stream.on_connection_window_available();
assert_eq!(previous_readiness, test_env.stream.get_stream_interests());
assert!(
!test_env
.stream
.get_stream_interests()
.connection_flow_control_credits
);
assert_eq!(
VarInt::from_u32(1500),
test_env.tx_connection_flow_controller.available_window()
);
// Accessing the stream should lead to the reset error
assert_matches!(test_env.poll_finish(), Poll::Ready(Err(_)));
if *reset_reason != ResetReason::InternalReset {
// If the Reset was not caused internally, it needs to get
// acknowledged before finalization
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
],
);
}
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
}
#[test]
fn stream_reports_stream_size_based_on_acquired_connection_window() {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1500,
initial_send_window: 100 * 1024,
initial_connection_send_window_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
// Enqueue data and get blocked on the flow control window
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 5000, true),
Instruction::EnqueueData(VarInt::from_u32(2000), 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 1000, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
],
);
// Make more credits available - which the Stream will acquire
assert_eq!(
VarInt::from_u32(0),
test_env.tx_connection_flow_controller.available_window()
);
test_env.tx_connection_flow_controller.on_max_data(MaxData {
maximum_data: VarInt::from_u32(4999),
});
assert_eq!(
VarInt::from_u32(3999),
test_env.tx_connection_flow_controller.available_window()
);
test_env.stream.on_connection_window_available();
assert_eq!(
VarInt::from_u32(0),
test_env.tx_connection_flow_controller.available_window()
);
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["ack", "tx"])),
Instruction::Reset(reset_error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
// The Stream is expected to report the complete window it acquired
Instruction::CheckResetTx(reset_error_code, pn(1), VarInt::from_u32(4999)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
}
#[test]
fn resetting_a_stream_takes_priority() {
let error_code = ApplicationErrorCode::new(123).unwrap();
for sizes in [None, Some(&[10, 10, 10])] {
for finish in [true, false] {
for flush in [true, false] {
for with_context in [true, false] {
let mut test_env = setup_send_only_test_env();
let will_wake = with_context && flush;
dbg!(sizes);
dbg!(finish);
dbg!(flush);
dbg!(with_context);
let mut request = ops::Request::default();
request.reset(error_code);
let mut chunks =
sizes.map(|sizes| gen_pattern_test_chunks(VarInt::from_u8(0), sizes));
if let Some(chunks) = chunks.as_deref_mut() {
request.send(chunks);
}
if finish {
request.finish();
}
if flush {
request.flush();
}
assert_eq!(
test_env.run_request(&mut request, with_context),
Ok(ops::Response {
tx: Some(ops::tx::Response {
status: ops::Status::Resetting,
will_wake,
..Default::default()
}),
rx: None,
}),
);
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(error_code, pn(0), VarInt::from_u8(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(will_wake))),
],
);
assert_matches!(
test_env.run_request(&mut request, with_context),
Ok(ops::Response {
tx: Some(ops::tx::Response {
status: ops::Status::Reset(StreamError::StreamReset { .. }),
..
}),
rx: None,
}),
);
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
}
}
}
#[test]
fn can_send_multiple_chunks() {
let max_send_buffer_size = 3000;
for sizes in [&[10, 10, 10][..], &[500, 500, 500], &[1000, 1000, 1000][..]] {
for finish in [false, true] {
for flush in [false, true] {
for with_context in [false, true] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size,
stream_id: StreamId::initial(
endpoint::Type::Client,
StreamType::Unidirectional,
),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let mut expected_buffer_size = max_send_buffer_size;
dbg!(sizes);
dbg!(finish);
dbg!(flush);
dbg!(with_context);
let mut chunks = gen_pattern_test_chunks(VarInt::from_u8(0), sizes);
let mut request = ops::Request::default();
request.send(&mut chunks);
if finish {
request.finish();
}
if flush {
request.flush();
}
let mut expected_consumed_bytes = 0;
let mut expected_consumed_chunks = 0;
for size in sizes.iter().cloned() {
expected_consumed_bytes += size;
expected_consumed_chunks += 1;
if let Some(size) = expected_buffer_size.checked_sub(size) {
expected_buffer_size = size;
} else {
expected_buffer_size = 0;
break;
}
}
let consumed_all = expected_consumed_chunks == sizes.len();
// finishing the buffer should end the availability
if finish {
expected_buffer_size = 0;
}
let will_wake = with_context && (!consumed_all || flush);
assert_eq!(
test_env.run_request(&mut request, with_context),
Ok(ops::Response {
tx: Some(ops::tx::Response {
bytes: ops::Bytes {
available: expected_buffer_size,
consumed: expected_consumed_bytes,
},
chunks: ops::Chunks {
available: expected_buffer_size,
consumed: expected_consumed_chunks,
},
status: if consumed_all && finish {
ops::Status::Finishing
} else {
ops::Status::Open
},
will_wake,
}),
rx: None,
}),
);
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["tx"]))],
);
let mut offset = 0;
let mut idx = 0;
while let Some(mut frame) = test_env.transmit() {
if let Frame::Stream(stream) = frame.as_frame() {
offset += stream.data.len();
test_env.ack_packet(pn(idx), ExpectWakeup(None));
idx += 1;
} else {
panic!("invalid frame");
}
}
assert_eq!(expected_consumed_bytes, offset);
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&[]))],
);
}
}
}
}
}
#[test]
fn detach_modes() {
for detached in [false, true] {
let test_env_config = TestEnvironmentConfig {
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let mut chunks = gen_pattern_test_chunks(VarInt::from_u8(0), &[16]);
let mut request = ops::Request::default();
request.send(&mut chunks).finish();
if detached {
request.detach_tx();
}
test_env
.run_request(&mut request, false)
.expect("request should succeed");
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 16, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(0), ExpectWakeup(Some(false))),
Instruction::CheckInterests(if detached {
stream_interests(&["fin"])
} else {
stream_interests(&[])
}),
],
);
}
}
#[test]
fn can_query_stream_readiness() {
let max_send_buffer_size = 1500;
for size in [None, Some(1000usize), Some(2000)] {
for with_context in [false, true] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
dbg!(size);
dbg!(with_context);
let mut expected_buffer_size = max_send_buffer_size;
// potentially fill the buffer
if let Some(size) = size {
test_env
.run_request(
ops::Request::default()
.send(&mut gen_pattern_test_chunks(VarInt::from_u8(0), &[size])),
false,
)
.expect("request should succeed");
expected_buffer_size = expected_buffer_size.saturating_sub(size);
};
assert_eq!(
test_env.run_request(ops::Request::default().send(&mut []), with_context),
Ok(ops::Response {
tx: Some(ops::tx::Response {
bytes: ops::Bytes {
available: expected_buffer_size,
consumed: 0
},
chunks: ops::Chunks {
available: expected_buffer_size,
consumed: 0,
},
status: ops::Status::Open,
will_wake: with_context && expected_buffer_size == 0,
}),
rx: None,
}),
);
}
}
}