in quic/s2n-quic-transport/src/endpoint/mod.rs [419:829]
fn receive_datagram(
&mut self,
header: &mut datagram::Header<Cfg::PathHandle>,
payload: &mut [u8],
timestamp: Timestamp,
) {
let endpoint_context = self.config.context();
// Try to decode the first packet in the datagram
let payload_len = payload.len();
let buffer = DecoderBufferMut::new(payload);
let buffer = {
let subject = event::builder::Subject::Endpoint {}.into_event();
let mut remote_address = header.path.remote_address();
{
endpoint_context
.packet_interceptor
.intercept_rx_remote_address(&subject, &mut remote_address);
header.path.set_remote_address(remote_address);
}
let mut local_address = header.path.local_address();
{
endpoint_context
.packet_interceptor
.intercept_rx_local_address(&subject, &mut local_address);
header.path.set_local_address(local_address);
}
let datagram = s2n_quic_core::packet::interceptor::Datagram {
remote_address: remote_address.into_event(),
local_address: local_address.into_event(),
timestamp,
};
endpoint_context
.packet_interceptor
.intercept_rx_datagram(&subject, &datagram, buffer)
};
let remote_address = header.path.remote_address();
let connection_info = ConnectionInfo::new(&remote_address);
let (packet, remaining) = if let Ok((packet, remaining)) = ProtectedPacket::decode(
buffer,
&connection_info,
endpoint_context.connection_id_format,
) {
(packet, remaining)
} else {
//= https://www.rfc-editor.org/rfc/rfc9000#section-5.2.2
//# Servers MUST drop incoming packets under all other circumstances.
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
//# However, endpoints MUST treat any packet ending in a
//# valid stateless reset token as a Stateless Reset, as other QUIC
//# versions might allow the use of a long header.
// The packet may be a stateless reset, check before returning.
let internal_connection_id = self.close_on_matching_stateless_reset(payload, timestamp);
if internal_connection_id.is_none() {
// The packet didn't contain a valid stateless token
let mut publisher = event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: Cfg::ENDPOINT_TYPE,
timestamp,
},
None,
self.config.context().event_subscriber,
);
publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason: event::builder::DatagramDropReason::DecodingFailed,
});
};
return;
};
let mut publisher = event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: Cfg::ENDPOINT_TYPE,
timestamp,
},
packet.version(),
endpoint_context.event_subscriber,
);
// Ensure the version is supported. This check occurs before the destination
// connection ID is parsed since future versions of QUIC could have different
// length requirements for connection IDs.
if self
.version_negotiator
.on_packet(&header.path, payload_len, &packet, &mut publisher)
.is_err()
{
publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason: event::builder::DatagramDropReason::UnsupportedVersion,
});
return;
}
let destination_connection_id =
match connection::LocalId::try_from_bytes(packet.destination_connection_id()) {
Some(connection_id) => connection_id,
None => {
// Ignore the datagram
publisher.on_endpoint_datagram_dropped(
event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason:
event::builder::DatagramDropReason::InvalidDestinationConnectionId,
},
);
return;
}
};
let source_connection_id = packet
.source_connection_id()
.and_then(PeerId::try_from_bytes);
let mut datagram = DatagramInfo {
timestamp,
payload_len,
ecn: header.ecn,
destination_connection_id,
destination_connection_id_classification: connection::id::Classification::Initial,
source_connection_id,
};
// TODO validate the connection ID before looking up the connection in the map
let close_packet_buffer = &mut self.close_packet_buffer;
// Try to lookup the internal connection ID and dispatch the packet
// to the Connection
if let Some((internal_id, dcid_classification)) = self
.connection_id_mapper
.lookup_internal_connection_id(&destination_connection_id)
{
let mut check_for_stateless_reset = false;
datagram.destination_connection_id_classification = dcid_classification;
let _ = self.connections.with_connection(internal_id, |conn| {
// The path `Id` needs to be passed around instead of the path to get around `&mut self` and
// `&mut self.path_manager` being borrowed at the same time
let path_id = conn
.on_datagram_received(
&header.path,
&datagram,
endpoint_context.congestion_controller,
endpoint_context.path_migration,
endpoint_context.mtu,
endpoint_context.event_subscriber,
)
.map_err(|datagram_drop_reason| {
// An error received at this point was caused by a datagram that has not
// been authenticated yet, and thus the connection should not be closed.
conn.with_event_publisher(
datagram.timestamp,
None,
endpoint_context.event_subscriber,
|publisher, _path| {
publisher.on_datagram_dropped(event::builder::DatagramDropped {
local_addr: header.path.local_address().into_event(),
remote_addr: header.path.remote_address().into_event(),
destination_cid: datagram
.destination_connection_id
.into_event(),
source_cid: datagram
.source_connection_id
.as_ref()
.map(|cid| cid.into_event()),
len: datagram.payload_len as u16,
reason: datagram_drop_reason,
});
},
);
})?;
if let Err(err) = conn.handle_packet(
&datagram,
path_id,
packet,
endpoint_context.random_generator,
endpoint_context.event_subscriber,
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
&mut check_for_stateless_reset,
) {
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1
//# An endpoint
//# that is closing is not required to process any received frame.
conn.close(
err,
endpoint_context.connection_close_formatter,
close_packet_buffer,
datagram.timestamp,
endpoint_context.event_subscriber,
endpoint_context.packet_interceptor,
);
}
if let Err(err) = conn.handle_remaining_packets(
&header.path,
&datagram,
path_id,
endpoint_context.connection_id_format,
remaining,
endpoint_context.random_generator,
endpoint_context.event_subscriber,
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
&mut check_for_stateless_reset,
) {
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1
//# An endpoint
//# that is closing is not required to process any received frame.
conn.close(
err,
endpoint_context.connection_close_formatter,
close_packet_buffer,
datagram.timestamp,
endpoint_context.event_subscriber,
endpoint_context.packet_interceptor,
);
return Err(());
}
Ok(())
});
if check_for_stateless_reset {
self.close_on_matching_stateless_reset(payload, timestamp);
}
return;
}
match (Cfg::ENDPOINT_TYPE, packet) {
(s2n_quic_core::endpoint::Type::Server, ProtectedPacket::Initial(packet)) => {
let source_connection_id =
match connection::PeerId::try_from_bytes(packet.source_connection_id()) {
Some(connection_id) => connection_id,
None => {
publisher.on_endpoint_datagram_dropped(
event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason:
event::builder::DatagramDropReason::InvalidSourceConnectionId,
},
);
return;
}
};
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1
//= type=TODO
//= tracking-issue=140
//# Additionally, an endpoint MAY consider the peer address validated if
//# the peer uses a connection ID chosen by the endpoint and the
//# connection ID contains at least 64 bits of entropy
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
//# In response to processing an Initial packet containing a token that
//# was provided in a Retry packet, a server cannot send another Retry
//# packet; it can only refuse the connection or permit it to proceed.
let retry_token_dcid = if !packet.token().is_empty() {
let mut context = token::Context::new(
&remote_address,
&source_connection_id,
endpoint_context.random_generator,
);
let outcome = endpoint_context
.token
.validate_token(&mut context, packet.token());
if outcome.is_none() {
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
//= type=TODO
//= tracking-issue=344
//# If the token is invalid, then the
//# server SHOULD proceed as if the client did not have a validated
//# address, including potentially sending a Retry packet.
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
//= type=TODO
//= tracking-issue=344
//# Instead, the
//# server SHOULD immediately close (Section 10.2) the connection with an
//# INVALID_TOKEN error.
publisher.on_endpoint_datagram_dropped(
event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason: event::builder::DatagramDropReason::InvalidRetryToken,
},
);
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
//# Servers MAY
//# discard any Initial packet that does not carry the expected token.
return;
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.3
//# If the validation succeeds, the server SHOULD then allow
//# the handshake to proceed.
outcome
} else {
//= https://www.rfc-editor.org/rfc/rfc9000#section-8.1.2
//# Upon receiving the client's Initial packet, the server can request
//# address validation by sending a Retry packet (Section 17.2.5)
//# containing a token.
if self
.connection_allowed(header, &packet, payload_len, timestamp)
.is_none()
{
//= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.1
//# A server MUST NOT send more than one Retry
//# packet in response to a single UDP datagram.
return;
}
None
};
if let Err(err) = self.handle_initial_packet(
header,
&datagram,
packet,
remaining,
retry_token_dcid,
) {
// TODO send a minimal connection close frame
let mut publisher = event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: Cfg::ENDPOINT_TYPE,
timestamp,
},
None,
self.config.context().event_subscriber,
);
publisher.on_endpoint_connection_attempt_failed(
event::builder::EndpointConnectionAttemptFailed { error: err },
);
}
}
(_, packet) => {
let is_short_header_packet = matches!(packet, ProtectedPacket::Short(_));
if Cfg::DcEndpoint::ENABLED
&& is_short_header_packet // dc packets are short header packets
&& endpoint_context.dc.on_possible_secret_control_packet(
&dc::DatagramInfo::new(&remote_address),
payload,
)
{
// This was a DC secret control packet, so we don't need to proceed
// with checking for a stateless reset
return;
}
publisher.on_endpoint_datagram_dropped(event::builder::EndpointDatagramDropped {
len: payload_len as u16,
reason: event::builder::DatagramDropReason::UnknownDestinationConnectionId,
});
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3.1
//# Endpoints MAY skip this check if any packet from a datagram is
//# successfully processed. However, the comparison MUST be performed
//# when the first packet in an incoming datagram either cannot be
//# associated with a connection, or cannot be decrypted.
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
//# However, endpoints MUST treat any packet ending in a
//# valid stateless reset token as a Stateless Reset, as other QUIC
//# versions might allow the use of a long header.
let is_stateless_reset = self
.close_on_matching_stateless_reset(payload, timestamp)
.is_some();
//= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.2
//# For instance, an endpoint MAY send a Stateless Reset in
//# response to any further incoming packets.
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
//# An endpoint MAY send a Stateless Reset in response to receiving a packet
//# that it cannot associate with an active connection.
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
//# Because the stateless reset token is not available
//# until connection establishment is complete or near completion,
//# ignoring an unknown packet with a long header might be as effective
//# as sending a Stateless Reset.
if !is_stateless_reset
&& Cfg::StatelessResetTokenGenerator::ENABLED
&& is_short_header_packet
{
self.enqueue_stateless_reset(header, &datagram, &destination_connection_id);
}
}
}
}