fn on_control_packet_impl()

in dc/s2n-quic-dc/src/stream/send/state.rs [247:413]


    fn on_control_packet_impl<C, Clk>(
        &mut self,
        control_key: &C,
        credentials: &Credentials,
        _ecn: ExplicitCongestionNotification,
        packet: &mut packet::control::decoder::Packet,
        random: &mut dyn random::Generator,
        clock: &Clk,
        transmission_queue: &application::transmission::Queue<buffer::Segment>,
        segment_alloc: &buffer::Allocator,
    ) -> Result<Option<processing::Error>, Error>
    where
        C: crypto::open::control::Stream,
        Clk: Clock,
    {
        probes::on_control_packet(
            credentials.id,
            credentials.key_id,
            packet.packet_number(),
            packet.control_data().len(),
        );

        // only process the packet after we know it's authentic
        let res = control_key.verify(packet.header(), packet.auth_tag());

        probes::on_control_packet_decrypted(
            credentials.id,
            credentials.key_id,
            packet.packet_number(),
            packet.control_data().len(),
            res.is_ok(),
        );

        // drop the packet if it failed to authenticate
        if let Err(err) = res {
            return Ok(Some(err.into()));
        }

        // check if we've already seen the packet
        ensure!(
            self.control_filter.on_packet(packet).is_ok(),
            return {
                probes::on_control_packet_duplicate(
                    credentials.id,
                    credentials.key_id,
                    packet.packet_number(),
                    packet.control_data().len(),
                );
                // drop the packet if we've already seen it
                Ok(Some(processing::Error::Duplicate))
            }
        );

        let packet_number = packet.packet_number();

        // raise our next expected control packet
        {
            let pn = packet_number.saturating_add(VarInt::from_u8(1));
            let pn = self.next_expected_control_packet.max(pn);
            self.next_expected_control_packet = pn;
        }

        let recv_time = clock.get_time();
        let mut newly_acked = false;
        let mut max_acked_stream = None;
        let mut max_acked_recovery = None;
        let mut loaded_transmit_queue = false;

        for frame in packet.control_frames_mut() {
            let frame = frame.map_err(|err| error::Kind::FrameError { decoder: err }.err())?;

            trace!(?frame);

            match frame {
                FrameMut::Padding(_) => {
                    continue;
                }
                FrameMut::Ping(_) => {
                    // no need to do anything special here
                }
                FrameMut::Ack(ack) => {
                    if !core::mem::replace(&mut loaded_transmit_queue, true) {
                        // make sure we have a current view of the application transmissions
                        self.load_transmission_queue(transmission_queue);
                    }

                    if ack.ecn_counts.is_some() {
                        self.on_frame_ack::<_, _, true>(
                            credentials,
                            &ack,
                            random,
                            &recv_time,
                            &mut newly_acked,
                            &mut max_acked_stream,
                            &mut max_acked_recovery,
                            segment_alloc,
                        )?;
                    } else {
                        self.on_frame_ack::<_, _, false>(
                            credentials,
                            &ack,
                            random,
                            &recv_time,
                            &mut newly_acked,
                            &mut max_acked_stream,
                            &mut max_acked_recovery,
                            segment_alloc,
                        )?;
                    }
                }
                FrameMut::MaxData(frame) => {
                    if self.max_data < frame.maximum_data {
                        self.max_data = frame.maximum_data;
                    }
                }
                FrameMut::ConnectionClose(close) => {
                    debug!(connection_close = ?close, state = ?self.state);

                    probes::on_close(
                        credentials.id,
                        credentials.key_id,
                        packet_number,
                        close.error_code,
                    );

                    // if there was no error and we transmitted everything then just shut the
                    // stream down
                    if close.error_code == VarInt::ZERO && close.frame_type.is_some() {
                        self.unacked_ranges.clear();
                        self.try_finish();
                        return Ok(None);
                    }

                    // no need to transmit a reset back to the peer - just close it
                    let _ = self.state.on_send_reset();
                    let _ = self.state.on_recv_reset_ack();
                    let error = if close.frame_type.is_some() {
                        error::Kind::TransportError {
                            code: close.error_code,
                        }
                    } else {
                        error::Kind::ApplicationError {
                            error: close.error_code.into(),
                        }
                    };
                    return Err(error.err());
                }
                _ => continue,
            }
        }

        for (space, pn) in [
            (stream::PacketSpace::Stream, max_acked_stream),
            (stream::PacketSpace::Recovery, max_acked_recovery),
        ] {
            if let Some(pn) = pn {
                self.detect_lost_packets(credentials, random, &recv_time, space, pn)?;
            }
        }

        self.on_peer_activity(newly_acked);

        // try to transition to the final state if we've sent all of the data
        self.try_finish();

        Ok(None)
    }