in quic/s2n-quic-transport/src/stream/send_stream/tests.rs [2226:2366]
fn resetting_the_stream_does_does_trigger_a_reset_frame_and_reset_errors() {
#[derive(Copy, Clone, Debug, PartialEq)]
enum ResetReason {
StopSending,
ApplicationReset,
InternalReset,
}
for reset_reason in &[
ResetReason::StopSending,
ResetReason::ApplicationReset,
ResetReason::InternalReset,
] {
for is_finishing in &[true, false] {
let test_env_config = TestEnvironmentConfig {
max_send_buffer_size: 1000,
stream_id: StreamId::initial(endpoint::Type::Client, StreamType::Unidirectional),
local_endpoint_type: endpoint::Type::Client,
..Default::default()
};
let mut test_env = setup_stream_test_env_with_config(test_env_config);
let reset_error_code = ApplicationErrorCode::new(0x3333_4444).unwrap();
let transmitted_data = if *is_finishing {
execute_instructions(
&mut test_env,
&[
Instruction::EnqueueData(VarInt::from_u32(0), 500, true),
Instruction::Finish(false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 500, true, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
500
} else {
execute_instructions(
&mut test_env,
&[
// Queue enough data to that a write gets blocked
Instruction::EnqueueData(VarInt::from_u32(0), 2000, true),
Instruction::EnqueueData(VarInt::from_u32(2000), 2000, false),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckDataTx(VarInt::from_u32(0), 2000, false, false, pn(0)),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
2000
};
match *reset_reason {
ResetReason::InternalReset => {
let mut events = StreamEvents::new();
test_env
.stream
.on_internal_reset(connection::Error::unspecified().into(), &mut events);
assert!(events.write_wake.is_some());
// No RESET frame should be transitted due to an internal reset
execute_instructions(
&mut test_env,
&[
Instruction::CheckNoTx,
Instruction::CheckInterests(stream_interests(&[])),
],
);
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::ConnectionError {
error: connection::Error::Unspecified { .. },
..
})),
);
}
ResetReason::ApplicationReset => {
// The reset should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::Reset(reset_error_code, true),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
// Accessing the stream should lead to the reset error
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
ResetReason::StopSending => {
// Stop sending should lead to an outgoing packet
execute_instructions(
&mut test_env,
&[
Instruction::StopSending(reset_error_code, ExpectWakeup(Some(true))),
Instruction::CheckInterests(stream_interests(&["tx"])),
Instruction::CheckResetTx(
reset_error_code,
pn(1),
VarInt::from_u32(transmitted_data),
),
Instruction::CheckInterests(stream_interests(&["ack"])),
],
);
// Accessing the stream should lead to the reset error
assert_matches!(
test_env.poll_finish(),
Poll::Ready(Err(StreamError::StreamReset { .. })),
);
}
};
if *reset_reason != ResetReason::InternalReset {
// If the Reset was not caused internally, it needs to get
// acknowledged before finalization
execute_instructions(
&mut test_env,
&[
Instruction::CheckInterests(stream_interests(&["ack"])),
Instruction::AckPacket(pn(1), ExpectWakeup(Some(false))),
],
);
}
execute_instructions(
&mut test_env,
&[Instruction::CheckInterests(stream_interests(&["fin"]))],
);
}
}
}