in dc/s2n-quic-dc/src/stream/send/state.rs [247:413]
fn on_control_packet_impl<C, Clk>(
&mut self,
control_key: &C,
credentials: &Credentials,
_ecn: ExplicitCongestionNotification,
packet: &mut packet::control::decoder::Packet,
random: &mut dyn random::Generator,
clock: &Clk,
transmission_queue: &application::transmission::Queue<buffer::Segment>,
segment_alloc: &buffer::Allocator,
) -> Result<Option<processing::Error>, Error>
where
C: crypto::open::control::Stream,
Clk: Clock,
{
probes::on_control_packet(
credentials.id,
credentials.key_id,
packet.packet_number(),
packet.control_data().len(),
);
// only process the packet after we know it's authentic
let res = control_key.verify(packet.header(), packet.auth_tag());
probes::on_control_packet_decrypted(
credentials.id,
credentials.key_id,
packet.packet_number(),
packet.control_data().len(),
res.is_ok(),
);
// drop the packet if it failed to authenticate
if let Err(err) = res {
return Ok(Some(err.into()));
}
// check if we've already seen the packet
ensure!(
self.control_filter.on_packet(packet).is_ok(),
return {
probes::on_control_packet_duplicate(
credentials.id,
credentials.key_id,
packet.packet_number(),
packet.control_data().len(),
);
// drop the packet if we've already seen it
Ok(Some(processing::Error::Duplicate))
}
);
let packet_number = packet.packet_number();
// raise our next expected control packet
{
let pn = packet_number.saturating_add(VarInt::from_u8(1));
let pn = self.next_expected_control_packet.max(pn);
self.next_expected_control_packet = pn;
}
let recv_time = clock.get_time();
let mut newly_acked = false;
let mut max_acked_stream = None;
let mut max_acked_recovery = None;
let mut loaded_transmit_queue = false;
for frame in packet.control_frames_mut() {
let frame = frame.map_err(|err| error::Kind::FrameError { decoder: err }.err())?;
trace!(?frame);
match frame {
FrameMut::Padding(_) => {
continue;
}
FrameMut::Ping(_) => {
// no need to do anything special here
}
FrameMut::Ack(ack) => {
if !core::mem::replace(&mut loaded_transmit_queue, true) {
// make sure we have a current view of the application transmissions
self.load_transmission_queue(transmission_queue);
}
if ack.ecn_counts.is_some() {
self.on_frame_ack::<_, _, true>(
credentials,
&ack,
random,
&recv_time,
&mut newly_acked,
&mut max_acked_stream,
&mut max_acked_recovery,
segment_alloc,
)?;
} else {
self.on_frame_ack::<_, _, false>(
credentials,
&ack,
random,
&recv_time,
&mut newly_acked,
&mut max_acked_stream,
&mut max_acked_recovery,
segment_alloc,
)?;
}
}
FrameMut::MaxData(frame) => {
if self.max_data < frame.maximum_data {
self.max_data = frame.maximum_data;
}
}
FrameMut::ConnectionClose(close) => {
debug!(connection_close = ?close, state = ?self.state);
probes::on_close(
credentials.id,
credentials.key_id,
packet_number,
close.error_code,
);
// if there was no error and we transmitted everything then just shut the
// stream down
if close.error_code == VarInt::ZERO && close.frame_type.is_some() {
self.unacked_ranges.clear();
self.try_finish();
return Ok(None);
}
// no need to transmit a reset back to the peer - just close it
let _ = self.state.on_send_reset();
let _ = self.state.on_recv_reset_ack();
let error = if close.frame_type.is_some() {
error::Kind::TransportError {
code: close.error_code,
}
} else {
error::Kind::ApplicationError {
error: close.error_code.into(),
}
};
return Err(error.err());
}
_ => continue,
}
}
for (space, pn) in [
(stream::PacketSpace::Stream, max_acked_stream),
(stream::PacketSpace::Recovery, max_acked_recovery),
] {
if let Some(pn) = pn {
self.detect_lost_packets(credentials, random, &recv_time, space, pn)?;
}
}
self.on_peer_activity(newly_acked);
// try to transition to the final state if we've sent all of the data
self.try_finish();
Ok(None)
}