in neqo-transport/src/connection/mod.rs [1616:1732]
fn input_path(
&mut self,
path: &PathRef,
mut d: Datagram<impl AsRef<[u8]> + AsMut<[u8]>>,
now: Instant,
) -> Res<()> {
qtrace!("[{self}] {} input {}", path.borrow(), hex(&d));
let tos = d.tos();
let remote = d.source();
let mut slc = d.as_mut();
let mut dcid = None;
let pto = path.borrow().rtt().pto(self.confirmed());
// Handle each packet in the datagram.
while !slc.is_empty() {
self.stats.borrow_mut().packets_rx += 1;
let slc_len = slc.len();
let (mut packet, remainder) =
match PublicPacket::decode(slc, self.cid_manager.decoder().as_ref()) {
Ok((packet, remainder)) => (packet, remainder),
Err(e) => {
qinfo!("[{self}] Garbage packet: {e}");
self.stats.borrow_mut().pkt_dropped("Garbage packet");
break;
}
};
self.stats.borrow_mut().dscp_rx[tos.into()] += 1;
match self.preprocess_packet(&packet, path, dcid.as_ref(), now)? {
PreprocessResult::Continue => (),
PreprocessResult::Next => break,
PreprocessResult::End => return Ok(()),
}
qtrace!("[{self}] Received unverified packet {packet:?}");
let packet_len = packet.len();
match packet.decrypt(&mut self.crypto.states, now + pto) {
Ok(payload) => {
// OK, we have a valid packet.
let pn = payload.pn();
self.idle_timeout.on_packet_received(now);
self.log_packet(
packet::MetaData::new_in(path, tos, packet_len, &payload),
now,
);
#[cfg(feature = "build-fuzzing-corpus")]
if payload.packet_type() == PacketType::Initial {
let target = if self.role == Role::Client {
"server_initial"
} else {
"client_initial"
};
neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]);
}
let space = PacketNumberSpace::from(payload.packet_type());
if let Some(space) = self.acks.get_mut(space) {
if space.is_duplicate(pn) {
qdebug!("Duplicate packet {space}-{}", pn);
self.stats.borrow_mut().dups_rx += 1;
} else {
match self.process_packet(path, &payload, now) {
Ok(migrate) => {
self.postprocess_packet(
path, tos, remote, &packet, pn, migrate, now,
);
}
Err(e) => {
self.ensure_error_path(path, &packet, now);
return Err(e);
}
}
}
} else {
qdebug!(
"[{self}] Received packet {space} for untracked space {}",
payload.pn()
);
return Err(Error::ProtocolViolation);
}
}
Err(e) => {
match e {
Error::KeysPending(epoch) => {
// This packet can't be decrypted because we don't have the keys yet.
// Don't check this packet for a stateless reset, just return.
let remaining = slc_len;
self.save_datagram(epoch, d, remaining, now);
return Ok(());
}
Error::KeysExhausted => {
// Exhausting read keys is fatal.
return Err(e);
}
Error::KeysDiscarded(epoch) => {
// This was a valid-appearing Initial packet: maybe probe with
// a Handshake packet to keep the handshake moving.
self.received_untracked |=
self.role == Role::Client && epoch == Epoch::Initial;
}
_ => (),
}
// Decryption failure, or not having keys is not fatal.
// If the state isn't available, or we can't decrypt the packet, drop
// the rest of the datagram on the floor, but don't generate an error.
self.check_stateless_reset(path, packet.data(), dcid.is_none(), now)?;
self.stats.borrow_mut().pkt_dropped("Decryption failure");
qlog::packet_dropped(&self.qlog, &packet, now);
}
}
slc = remainder;
dcid = Some(ConnectionId::from(packet.dcid()));
}
self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
Ok(())
}