in neqo-transport/src/connection/mod.rs [2951:3133]
fn input_frame(
&mut self,
path: &PathRef,
packet_version: Version,
packet_type: PacketType,
frame: Frame,
next_pn: PacketNumber,
now: Instant,
) -> Res<()> {
if !frame.is_allowed(packet_type) {
qinfo!("frame not allowed: {frame:?} {packet_type:?}");
return Err(Error::ProtocolViolation);
}
let space = PacketNumberSpace::from(packet_type);
if frame.is_stream() {
return self
.streams
.input_frame(&frame, &mut self.stats.borrow_mut().frame_rx);
}
match frame {
Frame::Padding(length) => {
self.stats.borrow_mut().frame_rx.padding += usize::from(length);
}
Frame::Ping => {
// If we get a PING and there are outstanding CRYPTO frames,
// prepare to resend them.
self.stats.borrow_mut().frame_rx.ping += 1;
self.crypto.resend_unacked(space);
// Send an ACK immediately if we might not otherwise do so.
self.acks.immediate_ack(space, now);
}
Frame::Ack {
largest_acknowledged,
ack_delay,
first_ack_range,
ack_ranges,
ecn_count,
} => {
// Ensure that the largest acknowledged packet number was actually sent.
// (If we ever start using non-contiguous packet numbers, we need to check all the
// packet numbers in the ACKed ranges.)
if largest_acknowledged >= next_pn {
qwarn!("Largest ACKed {largest_acknowledged} was never sent");
return Err(Error::AckedUnsentPacket);
}
let ranges =
Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges)?;
self.handle_ack(space, ranges, ecn_count, ack_delay, now)?;
}
Frame::Crypto { offset, data } => {
qtrace!(
"[{self}] Crypto frame on space={space} offset={offset}, data={:0x?}",
&data
);
self.stats.borrow_mut().frame_rx.crypto += 1;
self.crypto.streams.inbound_frame(space, offset, data)?;
if self.crypto.streams.data_ready(space) {
let mut buf = Vec::new();
let read = self.crypto.streams.read_to_end(space, &mut buf);
qdebug!("Read {read:?} bytes");
self.handshake(now, packet_version, space, Some(&buf))?;
self.create_resumption_token(now);
} else {
// If we get a useless CRYPTO frame send outstanding CRYPTO frames and 0-RTT
// data again.
self.crypto.resend_unacked(space);
if space == PacketNumberSpace::Initial {
self.crypto.resend_unacked(PacketNumberSpace::Handshake);
self.resend_0rtt(now);
}
}
}
Frame::NewToken { token } => {
self.stats.borrow_mut().frame_rx.new_token += 1;
self.new_token.save_token(token.to_vec());
self.create_resumption_token(now);
}
Frame::NewConnectionId {
sequence_number,
connection_id,
stateless_reset_token,
retire_prior,
} => {
self.stats.borrow_mut().frame_rx.new_connection_id += 1;
self.cids.add_remote(ConnectionIdEntry::new(
sequence_number,
ConnectionId::from(connection_id),
stateless_reset_token.to_owned(),
))?;
self.paths.retire_cids(retire_prior, &mut self.cids);
if self.cids.len() >= LOCAL_ACTIVE_CID_LIMIT {
qinfo!("[{self}] received too many connection IDs");
return Err(Error::ConnectionIdLimitExceeded);
}
}
Frame::RetireConnectionId { sequence_number } => {
self.stats.borrow_mut().frame_rx.retire_connection_id += 1;
self.cid_manager.retire(sequence_number);
}
Frame::PathChallenge { data } => {
self.stats.borrow_mut().frame_rx.path_challenge += 1;
// If we were challenged, try to make the path permanent.
// Report an error if we don't have enough connection IDs.
self.ensure_permanent(path, now)?;
path.borrow_mut().challenged(data);
}
Frame::PathResponse { data } => {
self.stats.borrow_mut().frame_rx.path_response += 1;
if self
.paths
.path_response(data, now, &mut self.stats.borrow_mut())
{
// This PATH_RESPONSE enabled migration; tell loss recovery.
self.loss_recovery.migrate();
}
}
Frame::ConnectionClose {
error_code,
frame_type,
reason_phrase,
} => {
self.stats.borrow_mut().frame_rx.connection_close += 1;
qinfo!(
"[{self}] ConnectionClose received. Error code: {error_code:?} frame type {frame_type:x} reason {reason_phrase}"
);
let (detail, frame_type) = if let CloseError::Application(_) = error_code {
// Use a transport error here because we want to send
// NO_ERROR in this case.
(
Error::PeerApplicationError(error_code.code()),
FrameType::ConnectionCloseApplication,
)
} else {
(
Error::PeerError(error_code.code()),
FrameType::ConnectionCloseTransport,
)
};
let error = CloseReason::Transport(detail);
self.state_signaling
.drain(Rc::clone(path), error.clone(), frame_type, "");
self.set_state(
State::Draining {
error,
timeout: self.get_closing_period_time(now),
},
now,
);
}
Frame::HandshakeDone => {
self.stats.borrow_mut().frame_rx.handshake_done += 1;
if self.role == Role::Server || !self.state.connected() {
return Err(Error::ProtocolViolation);
}
self.set_confirmed(now)?;
self.discard_keys(PacketNumberSpace::Handshake, now);
self.migrate_to_preferred_address(now)?;
}
Frame::AckFrequency {
seqno,
tolerance,
delay,
ignore_order,
} => {
self.stats.borrow_mut().frame_rx.ack_frequency += 1;
let delay = Duration::from_micros(delay);
if delay < GRANULARITY {
return Err(Error::ProtocolViolation);
}
self.acks
.ack_freq(seqno, tolerance - 1, delay, ignore_order);
}
Frame::Datagram { data, .. } => {
self.stats.borrow_mut().frame_rx.datagram += 1;
self.quic_datagrams
.handle_datagram(data, &mut self.stats.borrow_mut())?;
}
_ => unreachable!("All other frames are for streams"),
}
Ok(())
}