fn input_frame()

in neqo-transport/src/connection/mod.rs [2951:3133]


    fn input_frame(
        &mut self,
        path: &PathRef,
        packet_version: Version,
        packet_type: PacketType,
        frame: Frame,
        next_pn: PacketNumber,
        now: Instant,
    ) -> Res<()> {
        if !frame.is_allowed(packet_type) {
            qinfo!("frame not allowed: {frame:?} {packet_type:?}");
            return Err(Error::ProtocolViolation);
        }
        let space = PacketNumberSpace::from(packet_type);
        if frame.is_stream() {
            return self
                .streams
                .input_frame(&frame, &mut self.stats.borrow_mut().frame_rx);
        }
        match frame {
            Frame::Padding(length) => {
                self.stats.borrow_mut().frame_rx.padding += usize::from(length);
            }
            Frame::Ping => {
                // If we get a PING and there are outstanding CRYPTO frames,
                // prepare to resend them.
                self.stats.borrow_mut().frame_rx.ping += 1;
                self.crypto.resend_unacked(space);
                // Send an ACK immediately if we might not otherwise do so.
                self.acks.immediate_ack(space, now);
            }
            Frame::Ack {
                largest_acknowledged,
                ack_delay,
                first_ack_range,
                ack_ranges,
                ecn_count,
            } => {
                // Ensure that the largest acknowledged packet number was actually sent.
                // (If we ever start using non-contiguous packet numbers, we need to check all the
                // packet numbers in the ACKed ranges.)
                if largest_acknowledged >= next_pn {
                    qwarn!("Largest ACKed {largest_acknowledged} was never sent");
                    return Err(Error::AckedUnsentPacket);
                }

                let ranges =
                    Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges)?;
                self.handle_ack(space, ranges, ecn_count, ack_delay, now)?;
            }
            Frame::Crypto { offset, data } => {
                qtrace!(
                    "[{self}] Crypto frame on space={space} offset={offset}, data={:0x?}",
                    &data
                );
                self.stats.borrow_mut().frame_rx.crypto += 1;
                self.crypto.streams.inbound_frame(space, offset, data)?;
                if self.crypto.streams.data_ready(space) {
                    let mut buf = Vec::new();
                    let read = self.crypto.streams.read_to_end(space, &mut buf);
                    qdebug!("Read {read:?} bytes");
                    self.handshake(now, packet_version, space, Some(&buf))?;
                    self.create_resumption_token(now);
                } else {
                    // If we get a useless CRYPTO frame send outstanding CRYPTO frames and 0-RTT
                    // data again.
                    self.crypto.resend_unacked(space);
                    if space == PacketNumberSpace::Initial {
                        self.crypto.resend_unacked(PacketNumberSpace::Handshake);
                        self.resend_0rtt(now);
                    }
                }
            }
            Frame::NewToken { token } => {
                self.stats.borrow_mut().frame_rx.new_token += 1;
                self.new_token.save_token(token.to_vec());
                self.create_resumption_token(now);
            }
            Frame::NewConnectionId {
                sequence_number,
                connection_id,
                stateless_reset_token,
                retire_prior,
            } => {
                self.stats.borrow_mut().frame_rx.new_connection_id += 1;
                self.cids.add_remote(ConnectionIdEntry::new(
                    sequence_number,
                    ConnectionId::from(connection_id),
                    stateless_reset_token.to_owned(),
                ))?;
                self.paths.retire_cids(retire_prior, &mut self.cids);
                if self.cids.len() >= LOCAL_ACTIVE_CID_LIMIT {
                    qinfo!("[{self}] received too many connection IDs");
                    return Err(Error::ConnectionIdLimitExceeded);
                }
            }
            Frame::RetireConnectionId { sequence_number } => {
                self.stats.borrow_mut().frame_rx.retire_connection_id += 1;
                self.cid_manager.retire(sequence_number);
            }
            Frame::PathChallenge { data } => {
                self.stats.borrow_mut().frame_rx.path_challenge += 1;
                // If we were challenged, try to make the path permanent.
                // Report an error if we don't have enough connection IDs.
                self.ensure_permanent(path, now)?;
                path.borrow_mut().challenged(data);
            }
            Frame::PathResponse { data } => {
                self.stats.borrow_mut().frame_rx.path_response += 1;
                if self
                    .paths
                    .path_response(data, now, &mut self.stats.borrow_mut())
                {
                    // This PATH_RESPONSE enabled migration; tell loss recovery.
                    self.loss_recovery.migrate();
                }
            }
            Frame::ConnectionClose {
                error_code,
                frame_type,
                reason_phrase,
            } => {
                self.stats.borrow_mut().frame_rx.connection_close += 1;
                qinfo!(
                    "[{self}] ConnectionClose received. Error code: {error_code:?} frame type {frame_type:x} reason {reason_phrase}"
                );
                let (detail, frame_type) = if let CloseError::Application(_) = error_code {
                    // Use a transport error here because we want to send
                    // NO_ERROR in this case.
                    (
                        Error::PeerApplicationError(error_code.code()),
                        FrameType::ConnectionCloseApplication,
                    )
                } else {
                    (
                        Error::PeerError(error_code.code()),
                        FrameType::ConnectionCloseTransport,
                    )
                };
                let error = CloseReason::Transport(detail);
                self.state_signaling
                    .drain(Rc::clone(path), error.clone(), frame_type, "");
                self.set_state(
                    State::Draining {
                        error,
                        timeout: self.get_closing_period_time(now),
                    },
                    now,
                );
            }
            Frame::HandshakeDone => {
                self.stats.borrow_mut().frame_rx.handshake_done += 1;
                if self.role == Role::Server || !self.state.connected() {
                    return Err(Error::ProtocolViolation);
                }
                self.set_confirmed(now)?;
                self.discard_keys(PacketNumberSpace::Handshake, now);
                self.migrate_to_preferred_address(now)?;
            }
            Frame::AckFrequency {
                seqno,
                tolerance,
                delay,
                ignore_order,
            } => {
                self.stats.borrow_mut().frame_rx.ack_frequency += 1;
                let delay = Duration::from_micros(delay);
                if delay < GRANULARITY {
                    return Err(Error::ProtocolViolation);
                }
                self.acks
                    .ack_freq(seqno, tolerance - 1, delay, ignore_order);
            }
            Frame::Datagram { data, .. } => {
                self.stats.borrow_mut().frame_rx.datagram += 1;
                self.quic_datagrams
                    .handle_datagram(data, &mut self.stats.borrow_mut())?;
            }
            _ => unreachable!("All other frames are for streams"),
        }

        Ok(())
    }