in quic/s2n-quic-transport/src/stream/send_stream/tests.rs [2369:2519]
fn stream_does_not_try_to_acquire_connection_flow_control_credits_after_reset() {
#[derive(Copy, Clone, Debug, PartialEq)]
enum ResetReason {
StopSending,
ApplicationReset,
InternalReset,
}
for reset_reason in &[
ResetReason::StopSending,
ResetReason::ApplicationReset,
ResetReason::InternalReset,
] {
for is_finishing in &[true, false] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1500,
initial_connection_send_window_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
// Enqueue data and get blocked on the flow control window
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 2000, true),
// Try to check whether we got woken up during reset
Instruction::EnqueueData(VarInt::from_u32(2000), 1, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 1000, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
],
);
let transmitted_data = 1000;
if *is_finishing {
execute_instructions(
&mut test_env,
&[
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["ack", "cf"])),
],
);
}
match *reset_reason {
ResetReason::InternalReset => {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_some());
// No RESET frame should be transitted due to an internal reset
execute_instructions(
&mut test_env,
&[
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&[])),
],
);
}
ResetReason::ApplicationReset => {
// The reset should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::Reset(reset_error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
}
ResetReason::StopSending => {
// Stop sending should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(reset_error_code, ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
}
};
// Increasing the flow control window should not change readiness.
// It should also not lead the Stream to consume from our flow control
// window - even if we query it to do so.
assert_eq!(
VarInt::from_u32(0),
test_env.tx_connection_flow_controller.available_window()
);
let previous_readiness = test_env.stream.get_stream_interests();
test_env.tx_connection_flow_controller.on_max_data(MaxData {
maximum_data: VarInt::from_u32(2500),
});
assert_eq!(
VarInt::from_u32(1500),
test_env.tx_connection_flow_controller.available_window()
);
test_env.stream.on_connection_window_available();
assert_eq!(previous_readiness, test_env.stream.get_stream_interests());
assert!(
!test_env
.stream
.get_stream_interests()
.connection_flow_control_credits
);
assert_eq!(
VarInt::from_u32(1500),
test_env.tx_connection_flow_controller.available_window()
);
// Accessing the stream should lead to the reset error
assert_matches!(test_env.poll_finish(), Poll::Ready(Err(_)));
if *reset_reason != ResetReason::InternalReset {
// If the Reset was not caused internally, it needs to get
// acknowledged before finalization
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
],
);
}
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
}