quic/s2n-quic-transport/src/space/initial.rs (639 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ack::AckManager, connection::{self, ConnectionTransmissionContext, ProcessingError}, endpoint, path, path::{path_event, Path}, processed_packet::ProcessedPacket, recovery, space::{CryptoStream, HandshakeStatus, PacketSpace, TxPacketNumbers}, transmission, }; use core::{fmt, marker::PhantomData}; use s2n_codec::EncoderBuffer; use s2n_quic_core::{ connection::PeerId, crypto::{tls, CryptoSuite, InitialKey}, event::{self, ConnectionPublisher as _, IntoEvent}, frame::{ack::AckRanges, crypto::CryptoRef, Ack, ConnectionClose}, inet::DatagramInfo, packet::{ encoding::{PacketEncoder, PacketEncodingError}, initial::{CleartextInitial, Initial, ProtectedInitial}, number::{PacketNumber, PacketNumberRange, PacketNumberSpace, SlidingWindow}, }, time::{timer, Timestamp}, transport, }; use smallvec::SmallVec; pub struct InitialSpace<Config: endpoint::Config> { pub ack_manager: AckManager, //= https://www.rfc-editor.org/rfc/rfc9001#section-4 //# If QUIC needs to retransmit that data, it MUST use //# the same keys even if TLS has already updated to newer keys. pub key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialKey, pub header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialHeaderKey, //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9 //# If packets from a lower encryption level contain //# CRYPTO frames, frames that retransmit that data MUST be sent at the //# same encryption level. pub crypto_stream: CryptoStream, pub tx_packet_numbers: TxPacketNumbers, pub received_hello_message: bool, //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.3 //# Subsequent Initial packets from the client include the connection ID //# and token values from the Retry packet. retry_token: Vec<u8>, processed_packet_numbers: SlidingWindow, recovery_manager: recovery::Manager<Config>, } impl<Config: endpoint::Config> fmt::Debug for InitialSpace<Config> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InitialSpace") .field("ack_manager", &self.ack_manager) .field("tx_packet_numbers", &self.tx_packet_numbers) .field("processed_packet_numbers", &self.processed_packet_numbers) .field("recovery_manager", &self.recovery_manager) .finish() } } impl<Config: endpoint::Config> InitialSpace<Config> { pub fn new( key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialKey, header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialHeaderKey, now: Timestamp, ack_manager: AckManager, ) -> Self { Self { ack_manager, key, header_key, crypto_stream: CryptoStream::new(), tx_packet_numbers: TxPacketNumbers::new(PacketNumberSpace::Initial, now), received_hello_message: false, retry_token: Vec::new(), processed_packet_numbers: SlidingWindow::default(), recovery_manager: recovery::Manager::new(PacketNumberSpace::Initial), } } /// This method gets called when a Retry packet is processed. /// /// Reset the TLS stack and recover state when the first Retry packet is processed. /// Also regenerate the Initial keys based on the new retry_source_connection_id. pub fn on_retry_packet<Pub: event::ConnectionPublisher>( &mut self, path: &mut path::Path<Config>, path_id: path::Id, retry_source_connection_id: &PeerId, retry_token: &[u8], publisher: &mut Pub, ) { debug_assert!(Config::ENDPOINT_TYPE.is_client()); self.retry_token = retry_token.to_vec(); //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.2 //# Changing the Destination Connection ID field also results in //# a change to the keys used to protect the Initial packet. let (initial_key, initial_header_key) = <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialKey::new_client( retry_source_connection_id.as_bytes(), ); self.key = initial_key; self.header_key = initial_header_key; //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.5.3 //# Other than updating the Destination Connection ID and Token fields, //# the Initial packet sent by the client is subject to the same //# restrictions as the first Initial packet. A client MUST use the same //# cryptographic handshake message it included in this packet. self.crypto_stream.on_retry_packet(); // Reset the recovery state; discarding any previous Initial packets that // might have been sent/lost. self.recovery_manager .on_retry_packet(path, path_id, publisher); } /// Returns true if the packet number has already been processed pub fn is_duplicate<Pub: event::ConnectionPublisher>( &self, packet_number: PacketNumber, path_id: path::Id, path: &path::Path<Config>, publisher: &mut Pub, ) -> bool { let packet_check = self.processed_packet_numbers.check(packet_number); if let Err(error) = packet_check { publisher.on_duplicate_packet(event::builder::DuplicatePacket { packet_header: event::builder::PacketHeader::new( packet_number, publisher.quic_version(), ), path: path_event!(path, path_id), error: error.into_event(), }); } match packet_check { Ok(()) => false, Err(_) => true, } } pub fn on_transmit<'a>( &mut self, context: &mut ConnectionTransmissionContext<Config>, transmission_constraint: transmission::Constraint, handshake_status: &HandshakeStatus, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let mut packet_number = self.tx_packet_numbers.next(); if self.recovery_manager.requires_probe() && packet_number.as_u64() != 0 { context .publisher .on_packet_skipped(event::builder::PacketSkipped { number: packet_number.into_event(), space: event::builder::KeySpace::Initial, reason: event::builder::PacketSkipReason::PtoProbe, }); //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.4 //# If the sender wants to elicit a faster acknowledgement on PTO, it can //# skip a packet number to eliminate the acknowledgment delay. // TODO Does this interact negatively with persistent congestion detection, which // relies on consecutive packet numbers? packet_number = packet_number.next().unwrap(); } let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let payload = transmission::Transmission { config: PhantomData::<Config>, outcome: &mut outcome, packet_number, payload: transmission::early::Payload { ack_manager: &mut self.ack_manager, crypto_stream: &mut self.crypto_stream, recovery_manager: &mut self.recovery_manager, }, timestamp: context.timestamp, transmission_constraint, transmission_mode: context.transmission_mode, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let packet = Initial { version: context.quic_version, destination_connection_id, source_connection_id: context.path_manager[context.path_id].local_connection_id, token: self.retry_token.as_slice(), packet_number, payload, }; let (_protected_packet, buffer) = packet.encode_packet( &mut self.key, &self.header_key, packet_number_encoder, context.min_packet_len, buffer, )?; let time_sent = context.timestamp; let path_id = context.path_id; let (recovery_manager, mut recovery_context) = self.recovery(handshake_status, path_id, context.path_manager); recovery_manager.on_packet_sent( packet_number, outcome, time_sent, context.ecn, context.transmission_mode, None, &mut recovery_context, context.publisher, ); context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } pub(super) fn on_transmit_burst_complete( &mut self, active_path: &Path<Config>, timestamp: Timestamp, is_handshake_confirmed: bool, ) { self.recovery_manager.on_transmit_burst_complete( active_path, timestamp, is_handshake_confirmed, ); } pub(super) fn on_transmit_close<'a>( &mut self, context: &mut ConnectionTransmissionContext<Config>, connection_close: &ConnectionClose, buffer: EncoderBuffer<'a>, ) -> Result<(transmission::Outcome, EncoderBuffer<'a>), PacketEncodingError<'a>> { let packet_number = self.tx_packet_numbers.next(); let packet_number_encoder = self.packet_number_encoder(); let mut outcome = transmission::Outcome::default(); let destination_connection_id = context.path().peer_connection_id; let payload = transmission::Transmission { config: PhantomData::<Config>, outcome: &mut outcome, packet_number, payload: transmission::connection_close::Payload { connection_close }, timestamp: context.timestamp, transmission_constraint: transmission::Constraint::None, transmission_mode: transmission::Mode::Normal, tx_packet_numbers: &mut self.tx_packet_numbers, path_id: context.path_id, publisher: context.publisher, packet_interceptor: context.packet_interceptor, }; let packet = Initial { version: context.quic_version, destination_connection_id, source_connection_id: context.path_manager[context.path_id].local_connection_id, token: &[0u8; 0][..], packet_number, payload, }; let (_protected_packet, buffer) = packet.encode_packet( &mut self.key, &self.header_key, packet_number_encoder, context.min_packet_len, buffer, )?; context .publisher .on_packet_sent(event::builder::PacketSent { packet_header: event::builder::PacketHeader::new( packet_number, context.publisher.quic_version(), ), packet_len: outcome.bytes_sent, }); Ok((outcome, buffer)) } /// Called when the connection timer expired pub fn on_timeout<Pub: event::ConnectionPublisher>( &mut self, handshake_status: &HandshakeStatus, path_id: path::Id, path_manager: &mut path::Manager<Config>, random_generator: &mut Config::RandomGenerator, timestamp: Timestamp, max_pto_backoff: u32, publisher: &mut Pub, ) { self.ack_manager.on_timeout(timestamp); let (recovery_manager, mut context) = self.recovery(handshake_status, path_id, path_manager); recovery_manager.on_timeout( timestamp, random_generator, max_pto_backoff, &mut context, publisher, ); } /// Called before the Initial packet space is discarded pub fn on_discard<Pub: event::ConnectionPublisher>( &mut self, path: &mut Path<Config>, path_id: path::Id, publisher: &mut Pub, ) { publisher.on_key_space_discarded(event::builder::KeySpaceDiscarded { space: event::builder::KeySpace::Initial, }); self.recovery_manager .on_packet_number_space_discarded(path, path_id, publisher); } pub fn requires_probe(&self) -> bool { self.recovery_manager.requires_probe() } /// Returns the Packet Number to be used when decoding incoming packets pub fn packet_number_decoder(&self) -> PacketNumber { self.ack_manager.largest_received_packet_number_acked() } /// Returns the Packet Number to be used when encoding outgoing packets fn packet_number_encoder(&self) -> PacketNumber { self.tx_packet_numbers.largest_sent_packet_number_acked() } fn recovery<'a>( &'a mut self, handshake_status: &'a HandshakeStatus, path_id: path::Id, path_manager: &'a mut path::Manager<Config>, ) -> ( &'a mut recovery::Manager<Config>, RecoveryContext<'a, Config>, ) { ( &mut self.recovery_manager, RecoveryContext { ack_manager: &mut self.ack_manager, crypto_stream: &mut self.crypto_stream, tx_packet_numbers: &mut self.tx_packet_numbers, handshake_status, config: PhantomData, path_id, path_manager, }, ) } /// Validate packets in the Initial packet space pub fn validate_and_decrypt_packet<'a, Pub: event::ConnectionPublisher>( &self, protected: ProtectedInitial<'a>, path_id: path::Id, path: &path::Path<Config>, publisher: &mut Pub, ) -> Result<CleartextInitial<'a>, ProcessingError> { let packet_number_decoder = self.packet_number_decoder(); let packet = protected .unprotect(&self.header_key, packet_number_decoder) .map_err(|err| { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::UnprotectFailed { space: event::builder::KeySpace::Initial, path: path_event!(path, path_id), }, }); err })?; if self.is_duplicate(packet.packet_number, path_id, path, publisher) { return Err(ProcessingError::Other); } let packet_header = event::builder::PacketHeader::new(packet.packet_number, publisher.quic_version()); let decrypted = packet.decrypt(&self.key).map_err(|err| { publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::DecryptionFailed { packet_header, path: path_event!(path, path_id), }, }); err })?; if Config::ENDPOINT_TYPE.is_client() && !decrypted.token.is_empty() { //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# Initial packets sent by the server MUST set the Token Length field //# to 0; clients that receive an Initial packet with a non-zero Token //# Length field MUST either discard the packet or generate a //# connection error of type PROTOCOL_VIOLATION. publisher.on_packet_dropped(event::builder::PacketDropped { reason: event::builder::PacketDropReason::NonEmptyRetryToken { path: path_event!(path, path_id), }, }); return Err(ProcessingError::Other); } Ok(decrypted) } #[inline] fn parse_client_hello<Pub: event::ConnectionPublisher>( &mut self, publisher: &mut Pub, ) -> Result<(), transport::Error> { debug_assert!(Config::ENDPOINT_TYPE.is_server()); if let Some(payload) = self.parse_hello(tls::HandshakeType::ClientHello)? { publisher.on_tls_client_hello(event::builder::TlsClientHello { payload: &payload }); } Ok(()) } #[inline] fn parse_server_hello<Pub: event::ConnectionPublisher>( &mut self, publisher: &mut Pub, ) -> Result<(), transport::Error> { debug_assert!(Config::ENDPOINT_TYPE.is_client()); if let Some(payload) = self.parse_hello(tls::HandshakeType::ServerHello)? { publisher.on_tls_server_hello(event::builder::TlsServerHello { payload: &payload }); } Ok(()) } #[inline] fn parse_hello( &mut self, msg_type: tls::HandshakeType, ) -> Result<Option<SmallVec<[&[u8]; 16]>>, transport::Error> { debug_assert!( !self.received_hello_message, "should only be called before the hello is parsed" ); debug_assert_eq!( self.crypto_stream.rx.consumed_len(), 0, "should not consume any crypto data" ); let chunks = self.crypto_stream.rx.iter(); let total_received_len = self.crypto_stream.rx.total_received_len(); let mut chunks = chunks.peekable(); let empty_chunk = &[][..]; let header_chunk = chunks.peek().unwrap_or(&empty_chunk); // TODO make this configurable: // https://github.com/aws/s2n-quic/issues/1001 const MAX_HELLO_SIZE: u64 = 2 << 16; let outcome = <<Config::TLSEndpoint as tls::Endpoint>::Session as tls::Session>::parse_hello( msg_type, header_chunk, total_received_len, MAX_HELLO_SIZE, )?; if let Some(offsets) = outcome { // record that we've received the hello message self.received_hello_message = true; let payload = offsets.trim_chunks(chunks).collect(); Ok(Some(payload)) } else { // we are still waiting on more data Ok(None) } } } impl<Config: endpoint::Config> timer::Provider for InitialSpace<Config> { #[inline] fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result { self.ack_manager.timers(query)?; self.recovery_manager.timers(query)?; Ok(()) } } impl<Config: endpoint::Config> transmission::interest::Provider for InitialSpace<Config> { #[inline] fn transmission_interest<Q: transmission::interest::Query>( &self, query: &mut Q, ) -> transmission::interest::Result { self.ack_manager.transmission_interest(query)?; self.crypto_stream.transmission_interest(query)?; self.recovery_manager.transmission_interest(query)?; Ok(()) } } impl<Config: endpoint::Config> connection::finalization::Provider for InitialSpace<Config> { fn finalization_status(&self) -> connection::finalization::Status { // there's nothing in here that hold up finalizing a connection connection::finalization::Status::Idle } } struct RecoveryContext<'a, Config: endpoint::Config> { ack_manager: &'a mut AckManager, crypto_stream: &'a mut CryptoStream, tx_packet_numbers: &'a mut TxPacketNumbers, handshake_status: &'a HandshakeStatus, config: PhantomData<Config>, path_id: path::Id, path_manager: &'a mut path::Manager<Config>, } impl<Config: endpoint::Config> recovery::Context<Config> for RecoveryContext<'_, Config> { const ENDPOINT_TYPE: endpoint::Type = Config::ENDPOINT_TYPE; fn is_handshake_confirmed(&self) -> bool { self.handshake_status.is_confirmed() } fn active_path(&self) -> &Path<Config> { self.path_manager.active_path() } fn active_path_mut(&mut self) -> &mut Path<Config> { self.path_manager.active_path_mut() } fn path(&self) -> &Path<Config> { &self.path_manager[self.path_id] } fn path_mut(&mut self) -> &mut Path<Config> { &mut self.path_manager[self.path_id] } fn path_by_id(&self, path_id: path::Id) -> &path::Path<Config> { &self.path_manager[path_id] } fn path_mut_by_id(&mut self, path_id: path::Id) -> &mut path::Path<Config> { &mut self.path_manager[path_id] } fn path_id(&self) -> path::Id { self.path_id } fn validate_packet_ack( &mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange, lowest_tracking_packet_number: PacketNumber, ) -> Result<(), transport::Error> { self.tx_packet_numbers.on_packet_ack( timestamp, packet_number_range, lowest_tracking_packet_number, ) } fn on_new_packet_ack<Pub: event::ConnectionPublisher>( &mut self, packet_number_range: &PacketNumberRange, _publisher: &mut Pub, ) { self.crypto_stream.on_packet_ack(packet_number_range); } fn on_packet_ack(&mut self, timestamp: Timestamp, packet_number_range: &PacketNumberRange) { self.ack_manager .on_packet_ack(timestamp, packet_number_range); } fn on_packet_loss<Pub: event::ConnectionPublisher>( &mut self, packet_number_range: &PacketNumberRange, _publisher: &mut Pub, ) { self.crypto_stream.on_packet_loss(packet_number_range); self.ack_manager.on_packet_loss(packet_number_range); } fn on_rtt_update(&mut self, _now: Timestamp) {} fn on_mtu_update(&mut self, _mtu: u16) {} } //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# The payload of an Initial packet includes a CRYPTO frame (or frames) //# containing a cryptographic handshake message, ACK frames, or both. //# PING, PADDING, and CONNECTION_CLOSE frames of type 0x1c are also //# permitted. An endpoint that receives an Initial packet containing //# other frames can either discard the packet as spurious or treat it as //# a connection error. impl<Config: endpoint::Config> PacketSpace<Config> for InitialSpace<Config> { const INVALID_FRAME_ERROR: &'static str = "invalid frame in initial space"; /// Signals the connection was previously blocked by anti-amplification limits /// but is now no longer limited. fn on_amplification_unblocked( &mut self, path_manager: &path::Manager<Config>, timestamp: Timestamp, is_handshake_confirmed: bool, ) { debug_assert!( Config::ENDPOINT_TYPE.is_server(), "Clients are never in an anti-amplification state" ); //= https://www.rfc-editor.org/rfc/rfc9002#appendix-A.6 //# When a server is blocked by anti-amplification limits, receiving a //# datagram unblocks it, even if none of the packets in the datagram are //# successfully processed. In such a case, the PTO timer will need to //# be re-armed. self.recovery_manager.update_pto_timer( path_manager.active_path(), timestamp, is_handshake_confirmed, ); } fn handle_crypto_frame<Pub: event::ConnectionPublisher>( &mut self, frame: CryptoRef, _datagram: &DatagramInfo, _path: &mut Path<Config>, publisher: &mut Pub, ) -> Result<(), transport::Error> { self.crypto_stream.on_crypto_frame(frame)?; // try to parse out the hello message if we haven't yet if !self.received_hello_message { match Config::ENDPOINT_TYPE { endpoint::Type::Server => self.parse_client_hello(publisher)?, endpoint::Type::Client => self.parse_server_hello(publisher)?, } } Ok(()) } fn handle_ack_frame<A: AckRanges, Pub: event::ConnectionPublisher>( &mut self, frame: Ack<A>, timestamp: Timestamp, path_id: path::Id, path_manager: &mut path::Manager<Config>, packet_number: PacketNumber, handshake_status: &mut HandshakeStatus, _local_id_registry: &mut connection::LocalIdRegistry, random_generator: &mut Config::RandomGenerator, publisher: &mut Pub, ) -> Result<(), transport::Error> { let (recovery_manager, mut context) = self.recovery(handshake_status, path_id, path_manager); recovery_manager.on_ack_frame( timestamp, frame, packet_number, random_generator, &mut context, publisher, ) } fn handle_connection_close_frame<Pub: event::ConnectionPublisher>( &mut self, frame: ConnectionClose, path_id: path::Id, path: &mut Path<Config>, packet_number: PacketNumber, publisher: &mut Pub, ) -> Result<(), transport::Error> { publisher.on_connection_close_frame_received( event::builder::ConnectionCloseFrameReceived { packet_header: event::builder::PacketHeader::new( packet_number, publisher.quic_version(), ), path: path_event!(path, path_id), frame: frame.into_event(), }, ); //= https://www.rfc-editor.org/rfc/rfc9000#section-17.2.2 //# CONNECTION_CLOSE frames of type 0x1c are also //# permitted. if frame.tag() != 0x1c { return Err(transport::Error::PROTOCOL_VIOLATION); } Ok(()) } fn on_processed_packet<Pub: event::ConnectionPublisher>( &mut self, processed_packet: ProcessedPacket, path_id: path::Id, path: &Path<Config>, publisher: &mut Pub, ) -> Result<(), transport::Error> { self.ack_manager.on_processed_packet( &processed_packet, path_event!(path, path_id), publisher, ); self.processed_packet_numbers .insert(processed_packet.packet_number) .expect("packet number was already checked"); Ok(()) } }