fn receive_datagram()

in quic/s2n-quic-transport/src/endpoint/mod.rs [419:829]


    fn receive_datagram(
        &mut self,
        header: &mut datagram::Header<Cfg::PathHandle>,
        payload: &mut [u8],
        timestamp: Timestamp,
    ) {
        let endpoint_context = self.config.context();

        // Try to decode the first packet in the datagram
        let payload_len = payload.len();
        let buffer = DecoderBufferMut::new(payload);

        let buffer = {
            let subject = event::builder::Subject::Endpoint {}.into_event();

            let mut remote_address = header.path.remote_address();
            {
                endpoint_context
                    .packet_interceptor
                    .intercept_rx_remote_address(&subject, &mut remote_address);
                header.path.set_remote_address(remote_address);
            }

            let mut local_address = header.path.local_address();
            {
                endpoint_context
                    .packet_interceptor
                    .intercept_rx_local_address(&subject, &mut local_address);
                header.path.set_local_address(local_address);
            }

            let datagram = s2n_quic_core::packet::interceptor::Datagram {
                remote_address: remote_address.into_event(),
                local_address: local_address.into_event(),
                timestamp,
            };

            endpoint_context
                .packet_interceptor
                .intercept_rx_datagram(&subject, &datagram, buffer)
        };

        let remote_address = header.path.remote_address();
        let connection_info = ConnectionInfo::new(&remote_address);
        let (packet, remaining) = if let Ok((packet, remaining)) = ProtectedPacket::decode(
            buffer,
            &connection_info,
            endpoint_context.connection_id_format,
        ) {
            (packet, remaining)
        } else {
            //= https://www.rfc-editor.org/rfc/rfc9000#section-5.2.2
            //# Servers MUST drop incoming packets under all other circumstances.

            //= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
            //# However, endpoints MUST treat any packet ending in a
            //# valid stateless reset token as a Stateless Reset, as other QUIC
            //# versions might allow the use of a long header.

            // The packet may be a stateless reset, check before returning.
            let internal_connection_id = self.close_on_matching_stateless_reset(payload, timestamp);

            if internal_connection_id.is_none() {
                // The packet didn't contain a valid stateless token
                let mut publisher = event::EndpointPublisherSubscriber::new(
                    event::builder::EndpointMeta {
                        endpoint_type: Cfg::ENDPOINT_TYPE,
                        timestamp,
                    },
                    None,
                    self.config.context().event_subscriber,
                );
                publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
                    len: payload_len as u16,
                    reason: event::builder::DatagramDropReason::DecodingFailed,
                });
            };

            return;
        };

        let mut publisher = event::EndpointPublisherSubscriber::new(
            event::builder::EndpointMeta {
                endpoint_type: Cfg::ENDPOINT_TYPE,
                timestamp,
            },
            packet.version(),
            endpoint_context.event_subscriber,
        );

        // Ensure the version is supported. This check occurs before the destination
        // connection ID is parsed since future versions of QUIC could have different
        // length requirements for connection IDs.
        if self
            .version_negotiator
            .on_packet(&header.path, payload_len, &packet, &mut publisher)
            .is_err()
        {
            publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
                len: payload_len as u16,
                reason: event::builder::DatagramDropReason::UnsupportedVersion,
            });
            return;
        }

        let destination_connection_id =
            match connection::LocalId::try_from_bytes(packet.destination_connection_id()) {
                Some(connection_id) => connection_id,
                None => {
                    // Ignore the datagram
                    publisher.on_endpoint_datagram_dropped(
                        event::builder::EndpointDatagramDropped {
                            len: payload_len as u16,
                            reason:
                                event::builder::DatagramDropReason::InvalidDestinationConnectionId,
                        },
                    );
                    return;
                }
            };

        let source_connection_id = packet
            .source_connection_id()
            .and_then(PeerId::try_from_bytes);

        let mut datagram = DatagramInfo {
            timestamp,
            payload_len,
            ecn: header.ecn,
            destination_connection_id,
            destination_connection_id_classification: connection::id::Classification::Initial,
            source_connection_id,
        };

        // TODO validate the connection ID before looking up the connection in the map
        let close_packet_buffer = &mut self.close_packet_buffer;

        // Try to lookup the internal connection ID and dispatch the packet
        // to the Connection
        if let Some((internal_id, dcid_classification)) = self
            .connection_id_mapper
            .lookup_internal_connection_id(&destination_connection_id)
        {
            let mut check_for_stateless_reset = false;
            datagram.destination_connection_id_classification = dcid_classification;

            let _ = self.connections.with_connection(internal_id, |conn| {
                // The path `Id` needs to be passed around instead of the path to get around `&mut self` and
                // `&mut self.path_manager` being borrowed at the same time
                let path_id = conn
                    .on_datagram_received(
                        &header.path,
                        &datagram,
                        endpoint_context.congestion_controller,
                        endpoint_context.path_migration,
                        endpoint_context.mtu,
                        endpoint_context.event_subscriber,
                    )
                    .map_err(|datagram_drop_reason| {
                        // An error received at this point was caused by a datagram that has not
                        // been authenticated yet, and thus the connection should not be closed.
                        conn.with_event_publisher(
                            datagram.timestamp,
                            None,
                            endpoint_context.event_subscriber,
                            |publisher, _path| {
                                publisher.on_datagram_dropped(event::builder::DatagramDropped {
                                    local_addr: header.path.local_address().into_event(),
                                    remote_addr: header.path.remote_address().into_event(),
                                    destination_cid: datagram
                                        .destination_connection_id
                                        .into_event(),
                                    source_cid: datagram
                                        .source_connection_id
                                        .as_ref()
                                        .map(|cid| cid.into_event()),
                                    len: datagram.payload_len as u16,
                                    reason: datagram_drop_reason,
                                });
                            },
                        );
                    })?;

                if let Err(err) = conn.handle_packet(
                    &datagram,
                    path_id,
                    packet,
                    endpoint_context.random_generator,
                    endpoint_context.event_subscriber,
                    endpoint_context.packet_interceptor,
                    endpoint_context.datagram,
                    endpoint_context.dc,
                    endpoint_context.connection_limits,
                    &mut check_for_stateless_reset,
                ) {
                    //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1
                    //# An endpoint
                    //# that is closing is not required to process any received frame.
                    conn.close(
                        err,
                        endpoint_context.connection_close_formatter,
                        close_packet_buffer,
                        datagram.timestamp,
                        endpoint_context.event_subscriber,
                        endpoint_context.packet_interceptor,
                    );
                }

                if let Err(err) = conn.handle_remaining_packets(
                    &header.path,
                    &datagram,
                    path_id,
                    endpoint_context.connection_id_format,
                    remaining,
                    endpoint_context.random_generator,
                    endpoint_context.event_subscriber,
                    endpoint_context.packet_interceptor,
                    endpoint_context.datagram,
                    endpoint_context.dc,
                    endpoint_context.connection_limits,
                    &mut check_for_stateless_reset,
                ) {
                    //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1
                    //# An endpoint
                    //# that is closing is not required to process any received frame.
                    conn.close(
                        err,
                        endpoint_context.connection_close_formatter,
                        close_packet_buffer,
                        datagram.timestamp,
                        endpoint_context.event_subscriber,
                        endpoint_context.packet_interceptor,
                    );
                    return Err(());
                }

                Ok(())
            });

            if check_for_stateless_reset {
                self.close_on_matching_stateless_reset(payload, timestamp);
            }

            return;
        }

        match (Cfg::ENDPOINT_TYPE, packet) {
            (s2n_quic_core::endpoint::Type::Server, ProtectedPacket::Initial(packet)) => {
                let source_connection_id =
                    match connection::PeerId::try_from_bytes(packet.source_connection_id()) {
                        Some(connection_id) => connection_id,
                        None => {
                            publisher.on_endpoint_datagram_dropped(
                           event::builder::EndpointDatagramDropped {
                               len: payload_len as u16,
                               reason:
                                   event::builder::DatagramDropReason::InvalidSourceConnectionId,
                           },
                       );
                            return;
                        }
                    };

                //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1
                //= type=TODO
                //= tracking-issue=140
                //# Additionally, an endpoint MAY consider the peer address validated if
                //# the peer uses a connection ID chosen by the endpoint and the
                //# connection ID contains at least 64 bits of entropy

                //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
                //# In response to processing an Initial packet containing a token that
                //# was provided in a Retry packet, a server cannot send another Retry
                //# packet; it can only refuse the connection or permit it to proceed.
                let retry_token_dcid = if !packet.token().is_empty() {
                    let mut context = token::Context::new(
                        &remote_address,
                        &source_connection_id,
                        endpoint_context.random_generator,
                    );

                    let outcome = endpoint_context
                        .token
                        .validate_token(&mut context, packet.token());

                    if outcome.is_none() {
                        //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
                        //= type=TODO
                        //= tracking-issue=344
                        //# If the token is invalid, then the
                        //# server SHOULD proceed as if the client did not have a validated
                        //# address, including potentially sending a Retry packet.

                        //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
                        //= type=TODO
                        //= tracking-issue=344
                        //# Instead, the
                        //# server SHOULD immediately close (Section 10.2) the connection with an
                        //# INVALID_TOKEN error.
                        publisher.on_endpoint_datagram_dropped(
                            event::builder::EndpointDatagramDropped {
                                len: payload_len as u16,
                                reason: event::builder::DatagramDropReason::InvalidRetryToken,
                            },
                        );

                        //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
                        //# Servers MAY
                        //# discard any Initial packet that does not carry the expected token.
                        return;
                    }

                    //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
                    //# If the validation succeeds, the server SHOULD then allow
                    //# the handshake to proceed.
                    outcome
                } else {
                    //= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
                    //# Upon receiving the client's Initial packet, the server can request
                    //# address validation by sending a Retry packet (Section 17.2.5)
                    //# containing a token.
                    if self
                        .connection_allowed(header, &packet, payload_len, timestamp)
                        .is_none()
                    {
                        //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.1
                        //# A server MUST NOT send more than one Retry
                        //# packet in response to a single UDP datagram.
                        return;
                    }

                    None
                };

                if let Err(err) = self.handle_initial_packet(
                    header,
                    &datagram,
                    packet,
                    remaining,
                    retry_token_dcid,
                ) {
                    // TODO send a minimal connection close frame
                    let mut publisher = event::EndpointPublisherSubscriber::new(
                        event::builder::EndpointMeta {
                            endpoint_type: Cfg::ENDPOINT_TYPE,
                            timestamp,
                        },
                        None,
                        self.config.context().event_subscriber,
                    );
                    publisher.on_endpoint_connection_attempt_failed(
                        event::builder::EndpointConnectionAttemptFailed { error: err },
                    );
                }
            }
            (_, packet) => {
                let is_short_header_packet = matches!(packet, ProtectedPacket::Short(_));

                if Cfg::DcEndpoint::ENABLED
                    && is_short_header_packet // dc packets are short header packets
                    && endpoint_context.dc.on_possible_secret_control_packet(
                        &dc::DatagramInfo::new(&remote_address),
                        payload,
                    )
                {
                    // This was a DC secret control packet, so we don't need to proceed
                    // with checking for a stateless reset
                    return;
                }

                publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
                    len: payload_len as u16,
                    reason: event::builder::DatagramDropReason::UnknownDestinationConnectionId,
                });

                //= https://www.rfc-editor.org/rfc/rfc9000#section-10.3.1
                //# Endpoints MAY skip this check if any packet from a datagram is
                //# successfully processed.  However, the comparison MUST be performed
                //# when the first packet in an incoming datagram either cannot be
                //# associated with a connection, or cannot be decrypted.

                //= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
                //# However, endpoints MUST treat any packet ending in a
                //# valid stateless reset token as a Stateless Reset, as other QUIC
                //# versions might allow the use of a long header.
                let is_stateless_reset = self
                    .close_on_matching_stateless_reset(payload, timestamp)
                    .is_some();

                //= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.2
                //# For instance, an endpoint MAY send a Stateless Reset in
                //# response to any further incoming packets.

                //= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
                //# An endpoint MAY send a Stateless Reset in response to receiving a packet
                //# that it cannot associate with an active connection.

                //= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
                //# Because the stateless reset token is not available
                //# until connection establishment is complete or near completion,
                //# ignoring an unknown packet with a long header might be as effective
                //# as sending a Stateless Reset.
                if !is_stateless_reset
                    && Cfg::StatelessResetTokenGenerator::ENABLED
                    && is_short_header_packet
                {
                    self.enqueue_stateless_reset(header, &datagram, &destination_connection_id);
                }
            }
        }
    }