in quic/s2n-quic-transport/src/space/mod.rs [823:1123]
fn on_processed_packet<Pub: event::ConnectionPublisher>(
&mut self,
processed_packet: ProcessedPacket,
path_id: path::Id,
path: &Path<Config>,
publisher: &mut Pub,
) -> Result<(), transport::Error>;
fn handle_cleartext_payload<'a, Pub: event::ConnectionPublisher>(
&mut self,
packet_number: PacketNumber,
payload: DecoderBufferMut<'a>,
datagram: &'a DatagramInfo,
path_id: path::Id,
path_manager: &mut path::Manager<Config>,
handshake_status: &mut HandshakeStatus,
local_id_registry: &mut connection::LocalIdRegistry,
random_generator: &mut Config::RandomGenerator,
publisher: &mut Pub,
packet_interceptor: &mut Config::PacketInterceptor,
) -> Result<ProcessedPacket<'a>, connection::Error> {
use s2n_quic_core::frame::{Frame, FrameMut};
let mut payload = {
use s2n_quic_core::packet::interceptor::{Interceptor, Packet};
// intercept the payload after it is decrypted, but before we process the frames
packet_interceptor.intercept_rx_payload(
&publisher.subject(),
&Packet {
number: packet_number,
timestamp: datagram.timestamp,
},
payload,
)
};
let mut processed_packet = ProcessedPacket::new(packet_number, datagram);
macro_rules! on_frame_processed {
($frame:ident) => {{
let frame_type = $frame.tag();
processed_packet.on_processed_frame(&$frame);
move |err: transport::Error| err.with_frame_type(frame_type.into())
}};
}
{
// allow for an ACK frame to be injected by the packet interceptor
use s2n_quic_core::packet::interceptor::Interceptor;
let mut ack_context = AckInterceptContext {
packet_space: self,
timestamp: datagram.timestamp,
path_id,
path_manager,
packet_number,
handshake_status,
local_id_registry,
random_generator,
publisher,
on_processed_frame: |ack_frame| processed_packet.on_processed_frame(ack_frame),
error: None,
};
packet_interceptor.intercept_rx_ack(&ack_context.publisher.subject(), &mut ack_context);
if let Some(error) = ack_context.error {
Err(error)?;
}
}
while !payload.is_empty() {
let (frame, remaining) = payload
.decode::<FrameMut>()
.map_err(transport::Error::from)?;
let path = &path_manager[path_id];
publisher.on_frame_received(event::builder::FrameReceived {
packet_header: event::builder::PacketHeader::new(
packet_number,
publisher.quic_version(),
),
path: path_event!(path, path_id),
frame: frame.into_event(),
});
match frame {
Frame::Padding(frame) => {
//= https://www.rfc-editor.org/rfc/rfc9000#section-19.1
//# A PADDING frame (type=0x00) has no semantic value. PADDING frames
//# can be used to increase the size of a packet. Padding can be used to
//# increase an Initial packet to the minimum required size or to provide
//# protection against traffic analysis for protected packets.
let _ = on_frame_processed!(frame);
}
Frame::Ping(frame) => {
//= https://www.rfc-editor.org/rfc/rfc9000#section-19.2
//# Endpoints can use PING frames (type=0x01) to verify that their peers
//# are still alive or to check reachability to the peer.
let _ = on_frame_processed!(frame);
}
Frame::Crypto(frame) => {
let on_error = on_frame_processed!(frame);
//= https://www.rfc-editor.org/rfc/rfc9000#section-7.5
//# Packets containing
//# discarded CRYPTO frames MUST be acknowledged because the packet has
//# been received and processed by the transport even though the CRYPTO
//# frame was discarded.
self.handle_crypto_frame(
frame.into(),
datagram,
&mut path_manager[path_id],
publisher,
)
.map_err(on_error)?;
processed_packet.contains_crypto = true;
}
Frame::Ack(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_ack_frame(
frame,
datagram.timestamp,
path_id,
path_manager,
packet_number,
handshake_status,
local_id_registry,
random_generator,
publisher,
)
.map_err(on_error)?;
}
Frame::ConnectionClose(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_connection_close_frame(
frame,
path_id,
&mut path_manager[path_id],
packet_number,
publisher,
)
.map_err(on_error)?;
// skip processing any other frames and return an error
// use `from` instead of `into` so the location is correctly captured
return Err(connection::Error::from(frame));
}
Frame::Stream(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_stream_frame(frame.into(), &mut processed_packet)
.map_err(on_error)?;
}
Frame::Datagram(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_datagram_frame(
path_event!(path, path_id).into_event(),
frame.into(),
)
.map_err(on_error)?;
}
Frame::DataBlocked(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_data_blocked_frame(frame).map_err(on_error)?;
}
Frame::MaxData(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_max_data_frame(frame).map_err(on_error)?;
}
Frame::MaxStreamData(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_max_stream_data_frame(frame).map_err(on_error)?;
}
Frame::MaxStreams(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_max_streams_frame(frame).map_err(on_error)?;
}
Frame::ResetStream(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_reset_stream_frame(frame).map_err(on_error)?;
}
Frame::StopSending(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_stop_sending_frame(frame).map_err(on_error)?;
}
Frame::StreamDataBlocked(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_stream_data_blocked_frame(frame)
.map_err(on_error)?;
}
Frame::StreamsBlocked(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_streams_blocked_frame(frame).map_err(on_error)?;
}
Frame::NewToken(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_new_token_frame(frame).map_err(on_error)?;
}
Frame::NewConnectionId(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_new_connection_id_frame(frame, datagram, path_manager, publisher)
.map_err(on_error)?;
}
Frame::RetireConnectionId(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_retire_connection_id_frame(
frame,
datagram,
&mut path_manager[path_id],
local_id_registry,
)
.map_err(on_error)?;
}
Frame::PathChallenge(frame) => {
let on_error = on_frame_processed!(frame);
//= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.3
//# An endpoint that receives a PATH_CHALLENGE on an active path SHOULD
//# send a non-probing packet in response.
if path_manager.active_path_id() == path_id {
processed_packet.path_challenge_on_active_path = true;
}
self.handle_path_challenge_frame(frame, path_id, path_manager)
.map_err(on_error)?;
}
Frame::PathResponse(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_path_response_frame(
frame,
datagram.timestamp,
path_manager,
handshake_status,
publisher,
)
.map_err(on_error)?;
}
Frame::HandshakeDone(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_handshake_done_frame(
frame,
datagram.timestamp,
&mut path_manager[path_id],
local_id_registry,
handshake_status,
publisher,
)
.map_err(on_error)?;
}
Frame::DcStatelessResetTokens(frame) => {
let on_error = on_frame_processed!(frame);
self.handle_dc_stateless_reset_tokens_frame(frame, publisher)
.map_err(on_error)?;
}
}
payload = remaining;
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-12.4
//# The payload of a packet that contains frames MUST contain at least
//# one frame, and MAY contain multiple frames and multiple frame types.
//# An endpoint MUST treat receipt of a packet containing no frames as a
//# connection error of type PROTOCOL_VIOLATION.
if processed_packet.frames == 0 {
return Err(transport::Error::PROTOCOL_VIOLATION
.with_reason("packet contained no frames")
.into());
}
let amplification_outcome = path_manager.on_processed_packet(
path_id,
datagram.source_connection_id,
processed_packet.path_validation_probing,
random_generator,
publisher,
)?;
if amplification_outcome.is_active_path_unblocked() {
self.on_amplification_unblocked(
path_manager,
datagram.timestamp,
handshake_status.is_confirmed(),
);
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-13.1
//# A packet MUST NOT be acknowledged until packet protection has been
//# successfully removed and all frames contained in the packet have been
//# processed. For STREAM frames, this means the data has been enqueued
//# in preparation to be received by the application protocol, but it
//# does not require that data be delivered and consumed.
//#
//# Once the packet has been fully processed, a receiver acknowledges
//# receipt by sending one or more ACK frames containing the packet
//# number of the received packet.
self.on_processed_packet(processed_packet, path_id, &path_manager[path_id], publisher)?;
Ok(processed_packet)
}