in quic/s2n-quic-transport/src/connection/connection_impl.rs [862:1021]
fn on_transmit<Tx: tx::Queue<Handle = Config::PathHandle>>(
&mut self,
queue: &mut Tx,
timestamp: Timestamp,
subscriber: &mut Config::EventSubscriber,
packet_interceptor: &mut Config::PacketInterceptor,
) -> Result<(), ConnectionOnTransmitError> {
let mut count = 0;
debug_assert!(
!self.path_manager.is_amplification_limited(),
"connection should not express transmission interest if amplification limited"
);
match self.state {
ConnectionState::Handshaking | ConnectionState::Active | ConnectionState::Flushing => {
let mut outcome = transmission::Outcome::default();
let path_id = self.path_manager.active_path_id();
// Send an MTU probe if necessary and the handshake has been confirmed
// MTU probes are prioritized over other data so they are not blocked by the
// congestion controller, as they are critical to achieving maximum throughput.
if self.state == ConnectionState::Active
&& self.path_manager.active_path().can_transmit(timestamp)
&& self.space_manager.is_handshake_confirmed()
&& self
.path_manager
.active_path()
.mtu_controller
.can_transmit(self.path_manager.active_path().transmission_constraint())
&& queue
.push(ConnectionTransmission {
context: transmission_context!(
self,
&mut outcome,
path_id,
timestamp,
transmission::Mode::MtuProbing,
subscriber,
packet_interceptor,
),
space_manager: &mut self.space_manager,
})
.is_ok()
{
count += 1;
}
// Send all other data for the active path
while self.path_manager.active_path().can_transmit(timestamp)
&& queue
.push(ConnectionTransmission {
context: transmission_context!(
self,
&mut outcome,
path_id,
timestamp,
transmission::Mode::Normal,
subscriber,
packet_interceptor,
),
space_manager: &mut self.space_manager,
})
.is_ok()
{
count += 1;
}
if outcome.ack_elicitation.is_ack_eliciting() {
self.on_ack_eliciting_packet_sent(timestamp);
}
if let Some(edt) = self
.path_manager
.active_path()
.congestion_controller
.earliest_departure_time()
{
if !edt.has_elapsed(timestamp) {
// We can't transmit more until a future time, so arm the pacing
// timer to pause transmission until the earliest departure time.
//= https://www.rfc-editor.org/rfc/rfc9002#section-7.7
//# A sender SHOULD pace sending of all in-flight packets based on input
//# from the congestion controller.
//= https://www.rfc-editor.org/rfc/rfc9002#section-7.7
//# Senders MUST either use pacing or limit such bursts.
self.timers.pacing_timer.set(edt);
}
}
let meta = event::builder::ConnectionMeta {
endpoint_type: Config::ENDPOINT_TYPE,
id: self.internal_connection_id().into(),
timestamp,
};
let path_id = self.path_manager.active_path_id().as_u8();
let path = self.path_manager.active_path();
subscriber.on_recovery_metrics(
&mut self.event_context.context,
&meta.into_event(),
&recovery_event!(path_id, path).into_event(),
);
// PathValidationOnly handles transmission on non-active paths. Transmission
// on the active path should be handled prior to this.
count += self.path_validation_only_transmission(
queue,
timestamp,
&mut outcome,
subscriber,
packet_interceptor,
);
// If anything was transmitted, notify the space manager
// that a burst of packets has completed transmission
if count > 0 {
self.space_manager
.on_transmit_burst_complete(self.path_manager.active_path(), timestamp);
}
let mut publisher = self.event_context.publisher(timestamp, subscriber);
if outcome.bytes_progressed > 0 {
publisher.on_tx_stream_progress(TxStreamProgress {
bytes: outcome.bytes_progressed,
})
}
// check to see if we are flushing and should close
if self.poll_flush().is_ready() {
// trigger a wake up so we can close
self.wakeup_handle.wakeup();
}
}
ConnectionState::Closing => {
let mut publisher = self.event_context.publisher(timestamp, subscriber);
let path = self.path_manager.active_path_mut();
if queue
.push(
self.close_sender
.transmission(path, timestamp, &mut publisher),
)
.is_ok()
{
count += 1;
}
}
ConnectionState::Draining | ConnectionState::Finished => {
// We are not allowed to send any data in this states
}
}
if count == 0 {
Err(ConnectionOnTransmitError::NoDatagram)
} else {
Ok(())
}
}