fn on_transmit>()

in quic/s2n-quic-transport/src/connection/connection_impl.rs [862:1021]


    fn on_transmit<Tx: tx::Queue<Handle = Config::PathHandle>>(
        &mut self,
        queue: &mut Tx,
        timestamp: Timestamp,
        subscriber: &mut Config::EventSubscriber,
        packet_interceptor: &mut Config::PacketInterceptor,
    ) -> Result<(), ConnectionOnTransmitError> {
        let mut count = 0;

        debug_assert!(
            !self.path_manager.is_amplification_limited(),
            "connection should not express transmission interest if amplification limited"
        );

        match self.state {
            ConnectionState::Handshaking | ConnectionState::Active | ConnectionState::Flushing => {
                let mut outcome = transmission::Outcome::default();
                let path_id = self.path_manager.active_path_id();

                // Send an MTU probe if necessary and the handshake has been confirmed
                // MTU probes are prioritized over other data so they are not blocked by the
                // congestion controller, as they are critical to achieving maximum throughput.
                if self.state == ConnectionState::Active
                    && self.path_manager.active_path().can_transmit(timestamp)
                    && self.space_manager.is_handshake_confirmed()
                    && self
                        .path_manager
                        .active_path()
                        .mtu_controller
                        .can_transmit(self.path_manager.active_path().transmission_constraint())
                    && queue
                        .push(ConnectionTransmission {
                            context: transmission_context!(
                                self,
                                &mut outcome,
                                path_id,
                                timestamp,
                                transmission::Mode::MtuProbing,
                                subscriber,
                                packet_interceptor,
                            ),
                            space_manager: &mut self.space_manager,
                        })
                        .is_ok()
                {
                    count += 1;
                }

                // Send all other data for the active path
                while self.path_manager.active_path().can_transmit(timestamp)
                    && queue
                        .push(ConnectionTransmission {
                            context: transmission_context!(
                                self,
                                &mut outcome,
                                path_id,
                                timestamp,
                                transmission::Mode::Normal,
                                subscriber,
                                packet_interceptor,
                            ),
                            space_manager: &mut self.space_manager,
                        })
                        .is_ok()
                {
                    count += 1;
                }

                if outcome.ack_elicitation.is_ack_eliciting() {
                    self.on_ack_eliciting_packet_sent(timestamp);
                }

                if let Some(edt) = self
                    .path_manager
                    .active_path()
                    .congestion_controller
                    .earliest_departure_time()
                {
                    if !edt.has_elapsed(timestamp) {
                        // We can't transmit more until a future time, so arm the pacing
                        // timer to pause transmission until the earliest departure time.

                        //= https://www.rfc-editor.org/rfc/rfc9002#section-7.7
                        //# A sender SHOULD pace sending of all in-flight packets based on input
                        //# from the congestion controller.

                        //= https://www.rfc-editor.org/rfc/rfc9002#section-7.7
                        //# Senders MUST either use pacing or limit such bursts.
                        self.timers.pacing_timer.set(edt);
                    }
                }

                let meta = event::builder::ConnectionMeta {
                    endpoint_type: Config::ENDPOINT_TYPE,
                    id: self.internal_connection_id().into(),
                    timestamp,
                };
                let path_id = self.path_manager.active_path_id().as_u8();
                let path = self.path_manager.active_path();
                subscriber.on_recovery_metrics(
                    &mut self.event_context.context,
                    &meta.into_event(),
                    &recovery_event!(path_id, path).into_event(),
                );

                // PathValidationOnly handles transmission on non-active paths. Transmission
                // on the active path should be handled prior to this.
                count += self.path_validation_only_transmission(
                    queue,
                    timestamp,
                    &mut outcome,
                    subscriber,
                    packet_interceptor,
                );

                // If anything was transmitted, notify the space manager
                // that a burst of packets has completed transmission
                if count > 0 {
                    self.space_manager
                        .on_transmit_burst_complete(self.path_manager.active_path(), timestamp);
                }

                let mut publisher = self.event_context.publisher(timestamp, subscriber);
                if outcome.bytes_progressed > 0 {
                    publisher.on_tx_stream_progress(TxStreamProgress {
                        bytes: outcome.bytes_progressed,
                    })
                }

                // check to see if we are flushing and should close
                if self.poll_flush().is_ready() {
                    // trigger a wake up so we can close
                    self.wakeup_handle.wakeup();
                }
            }
            ConnectionState::Closing => {
                let mut publisher = self.event_context.publisher(timestamp, subscriber);
                let path = self.path_manager.active_path_mut();

                if queue
                    .push(
                        self.close_sender
                            .transmission(path, timestamp, &mut publisher),
                    )
                    .is_ok()
                {
                    count += 1;
                }
            }
            ConnectionState::Draining | ConnectionState::Finished => {
                // We are not allowed to send any data in this states
            }
        }

        if count == 0 {
            Err(ConnectionOnTransmitError::NoDatagram)
        } else {
            Ok(())
        }
    }