fn on_processed_packet()

in quic/s2n-quic-transport/src/space/mod.rs [823:1123]


    fn on_processed_packet<Pub: event::ConnectionPublisher>(
        &mut self,
        processed_packet: ProcessedPacket,
        path_id: path::Id,
        path: &Path<Config>,
        publisher: &mut Pub,
    ) -> Result<(), transport::Error>;

    fn handle_cleartext_payload<'a, Pub: event::ConnectionPublisher>(
        &mut self,
        packet_number: PacketNumber,
        payload: DecoderBufferMut<'a>,
        datagram: &'a DatagramInfo,
        path_id: path::Id,
        path_manager: &mut path::Manager<Config>,
        handshake_status: &mut HandshakeStatus,
        local_id_registry: &mut connection::LocalIdRegistry,
        random_generator: &mut Config::RandomGenerator,
        publisher: &mut Pub,
        packet_interceptor: &mut Config::PacketInterceptor,
    ) -> Result<ProcessedPacket<'a>, connection::Error> {
        use s2n_quic_core::frame::{Frame, FrameMut};

        let mut payload = {
            use s2n_quic_core::packet::interceptor::{Interceptor, Packet};

            // intercept the payload after it is decrypted, but before we process the frames
            packet_interceptor.intercept_rx_payload(
                &publisher.subject(),
                &Packet {
                    number: packet_number,
                    timestamp: datagram.timestamp,
                },
                payload,
            )
        };

        let mut processed_packet = ProcessedPacket::new(packet_number, datagram);

        macro_rules! on_frame_processed {
            ($frame:ident) => {{
                let frame_type = $frame.tag();
                processed_packet.on_processed_frame(&$frame);
                move |err: transport::Error| err.with_frame_type(frame_type.into())
            }};
        }

        {
            // allow for an ACK frame to be injected by the packet interceptor
            use s2n_quic_core::packet::interceptor::Interceptor;
            let mut ack_context = AckInterceptContext {
                packet_space: self,
                timestamp: datagram.timestamp,
                path_id,
                path_manager,
                packet_number,
                handshake_status,
                local_id_registry,
                random_generator,
                publisher,
                on_processed_frame: |ack_frame| processed_packet.on_processed_frame(ack_frame),
                error: None,
            };
            packet_interceptor.intercept_rx_ack(&ack_context.publisher.subject(), &mut ack_context);
            if let Some(error) = ack_context.error {
                Err(error)?;
            }
        }

        while !payload.is_empty() {
            let (frame, remaining) = payload
                .decode::<FrameMut>()
                .map_err(transport::Error::from)?;

            let path = &path_manager[path_id];
            publisher.on_frame_received(event::builder::FrameReceived {
                packet_header: event::builder::PacketHeader::new(
                    packet_number,
                    publisher.quic_version(),
                ),
                path: path_event!(path, path_id),
                frame: frame.into_event(),
            });

            match frame {
                Frame::Padding(frame) => {
                    //= https://www.rfc-editor.org/rfc/rfc9000#section-19.1
                    //# A PADDING frame (type=0x00) has no semantic value.  PADDING frames
                    //# can be used to increase the size of a packet.  Padding can be used to
                    //# increase an Initial packet to the minimum required size or to provide
                    //# protection against traffic analysis for protected packets.
                    let _ = on_frame_processed!(frame);
                }
                Frame::Ping(frame) => {
                    //= https://www.rfc-editor.org/rfc/rfc9000#section-19.2
                    //# Endpoints can use PING frames (type=0x01) to verify that their peers
                    //# are still alive or to check reachability to the peer.
                    let _ = on_frame_processed!(frame);
                }
                Frame::Crypto(frame) => {
                    let on_error = on_frame_processed!(frame);

                    //= https://www.rfc-editor.org/rfc/rfc9000#section-7.5
                    //# Packets containing
                    //# discarded CRYPTO frames MUST be acknowledged because the packet has
                    //# been received and processed by the transport even though the CRYPTO
                    //# frame was discarded.

                    self.handle_crypto_frame(
                        frame.into(),
                        datagram,
                        &mut path_manager[path_id],
                        publisher,
                    )
                    .map_err(on_error)?;
                    processed_packet.contains_crypto = true;
                }
                Frame::Ack(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_ack_frame(
                        frame,
                        datagram.timestamp,
                        path_id,
                        path_manager,
                        packet_number,
                        handshake_status,
                        local_id_registry,
                        random_generator,
                        publisher,
                    )
                    .map_err(on_error)?;
                }
                Frame::ConnectionClose(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_connection_close_frame(
                        frame,
                        path_id,
                        &mut path_manager[path_id],
                        packet_number,
                        publisher,
                    )
                    .map_err(on_error)?;

                    // skip processing any other frames and return an error

                    // use `from` instead of `into` so the location is correctly captured
                    return Err(connection::Error::from(frame));
                }
                Frame::Stream(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_stream_frame(frame.into(), &mut processed_packet)
                        .map_err(on_error)?;
                }
                Frame::Datagram(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_datagram_frame(
                        path_event!(path, path_id).into_event(),
                        frame.into(),
                    )
                    .map_err(on_error)?;
                }
                Frame::DataBlocked(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_data_blocked_frame(frame).map_err(on_error)?;
                }
                Frame::MaxData(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_max_data_frame(frame).map_err(on_error)?;
                }
                Frame::MaxStreamData(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_max_stream_data_frame(frame).map_err(on_error)?;
                }
                Frame::MaxStreams(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_max_streams_frame(frame).map_err(on_error)?;
                }
                Frame::ResetStream(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_reset_stream_frame(frame).map_err(on_error)?;
                }
                Frame::StopSending(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_stop_sending_frame(frame).map_err(on_error)?;
                }
                Frame::StreamDataBlocked(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_stream_data_blocked_frame(frame)
                        .map_err(on_error)?;
                }
                Frame::StreamsBlocked(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_streams_blocked_frame(frame).map_err(on_error)?;
                }
                Frame::NewToken(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_new_token_frame(frame).map_err(on_error)?;
                }
                Frame::NewConnectionId(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_new_connection_id_frame(frame, datagram, path_manager, publisher)
                        .map_err(on_error)?;
                }
                Frame::RetireConnectionId(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_retire_connection_id_frame(
                        frame,
                        datagram,
                        &mut path_manager[path_id],
                        local_id_registry,
                    )
                    .map_err(on_error)?;
                }
                Frame::PathChallenge(frame) => {
                    let on_error = on_frame_processed!(frame);

                    //= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.3
                    //# An endpoint that receives a PATH_CHALLENGE on an active path SHOULD
                    //# send a non-probing packet in response.
                    if path_manager.active_path_id() == path_id {
                        processed_packet.path_challenge_on_active_path = true;
                    }
                    self.handle_path_challenge_frame(frame, path_id, path_manager)
                        .map_err(on_error)?;
                }
                Frame::PathResponse(frame) => {
                    let on_error = on_frame_processed!(frame);

                    self.handle_path_response_frame(
                        frame,
                        datagram.timestamp,
                        path_manager,
                        handshake_status,
                        publisher,
                    )
                    .map_err(on_error)?;
                }
                Frame::HandshakeDone(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_handshake_done_frame(
                        frame,
                        datagram.timestamp,
                        &mut path_manager[path_id],
                        local_id_registry,
                        handshake_status,
                        publisher,
                    )
                    .map_err(on_error)?;
                }
                Frame::DcStatelessResetTokens(frame) => {
                    let on_error = on_frame_processed!(frame);
                    self.handle_dc_stateless_reset_tokens_frame(frame, publisher)
                        .map_err(on_error)?;
                }
            }

            payload = remaining;
        }

        //= https://www.rfc-editor.org/rfc/rfc9000#section-12.4
        //# The payload of a packet that contains frames MUST contain at least
        //# one frame, and MAY contain multiple frames and multiple frame types.
        //# An endpoint MUST treat receipt of a packet containing no frames as a
        //# connection error of type PROTOCOL_VIOLATION.
        if processed_packet.frames == 0 {
            return Err(transport::Error::PROTOCOL_VIOLATION
                .with_reason("packet contained no frames")
                .into());
        }

        let amplification_outcome = path_manager.on_processed_packet(
            path_id,
            datagram.source_connection_id,
            processed_packet.path_validation_probing,
            random_generator,
            publisher,
        )?;

        if amplification_outcome.is_active_path_unblocked() {
            self.on_amplification_unblocked(
                path_manager,
                datagram.timestamp,
                handshake_status.is_confirmed(),
            );
        }

        //= https://www.rfc-editor.org/rfc/rfc9000#section-13.1
        //# A packet MUST NOT be acknowledged until packet protection has been
        //# successfully removed and all frames contained in the packet have been
        //# processed.  For STREAM frames, this means the data has been enqueued
        //# in preparation to be received by the application protocol, but it
        //# does not require that data be delivered and consumed.
        //#
        //# Once the packet has been fully processed, a receiver acknowledges
        //# receipt by sending one or more ACK frames containing the packet
        //# number of the received packet.

        self.on_processed_packet(processed_packet, path_id, &path_manager[path_id], publisher)?;

        Ok(processed_packet)
    }