fn update_interests()

in quic/s2n-quic-transport/src/connection/connection_container.rs [402:592]


    fn update_interests(
        &mut self,
        accept_queue: &mut AcceptorSender,
        node: &ConnectionNode<C, L>,
        interests: ConnectionInterests,
        result: ConnectionContainerIterationResult,
    ) -> Result<(), L::Error> {
        let id = node.internal_connection_id;

        // Note that all comparisons start by checking whether the connection is
        // already part of the given list. This is required in order for the
        // following operation to be safe. Inserting an element in a list while
        // it is already part of a (different) list can panic. Trying to remove
        // an element from a list while it is not actually part of the list
        // is undefined.

        macro_rules! insert_interest {
            ($list_name:ident, $call:ident) => {
                let node = unsafe {
                    // We have to obtain an `Arc<ConnectionNode>` in order to be able to
                    // perform interest updates later on. However the intrusive tree
                    // API only provides us a raw reference.
                    // Safety: We know that all of our ConnectionNode's are stored in
                    // reference counted pointers.
                    node.arc_from_ref()
                };

                self.$list_name.$call(node);
            };
        }

        macro_rules! remove_interest {
            ($list_name:ident) => {
                let mut cursor = unsafe {
                    // Safety: We know that the node is only ever part of this list.
                    // While elements are in temporary lists, they always get unlinked
                    // from those temporary lists while their interest is updated.
                    self.$list_name
                        .cursor_mut_from_ptr(node.deref() as *const ConnectionNode<C, L>)
                };
                cursor.remove();
            };
        }

        macro_rules! sync_interests_list {
            ($interest:expr, $link_name:ident, $list_name:ident) => {
                if $interest != node.$link_name.is_linked() {
                    if $interest {
                        if matches!(result, ConnectionContainerIterationResult::Continue) {
                            insert_interest!($list_name, push_back);
                        } else {
                            insert_interest!($list_name, push_front);
                        }
                    } else {
                        remove_interest!($list_name);
                    }
                }
                debug_assert_eq!($interest, node.$link_name.is_linked());
            };
        }

        sync_interests_list!(
            interests.transmission,
            waiting_for_transmission_link,
            waiting_for_transmission
        );

        sync_interests_list!(
            interests.new_connection_id,
            waiting_for_connection_id_link,
            waiting_for_connection_id
        );

        // Check if the timeout has changed since last time we queried the interests
        if node.timeout.get() != interests.timeout {
            // remove the connection if it's currently linked
            if node.waiting_for_timeout_link.is_linked() {
                remove_interest!(waiting_for_timeout);
            }
            // set the new timeout value
            node.timeout.set(interests.timeout);
            // insert the connection if it still has a value
            if interests.timeout.is_some() {
                insert_interest!(waiting_for_timeout, insert);
            }
        } else {
            // make sure the timeout value reflects the connection's presence in the timeout list
            debug_assert_eq!(
                interests.timeout.is_some(),
                node.waiting_for_timeout_link.is_linked()
            );
        }

        // Accepted connections are only automatically pushed into the accepted connections queue.
        if interests.accept {
            node.inner.write(|conn| {
                debug_assert!(!conn.is_handshaking());
                conn.mark_as_accepted();
            })?;

            // Decrement the inflight handshakes because this connection completed the
            // handshake and is being passed to the application to be accepted.
            self.handshake_connections -= 1;

            let handle = unsafe {
                // We have to obtain an `Arc<ConnectionNode>` in order to be able to
                // perform interest updates later on. However the intrusive tree
                // API only provides us a raw reference.
                // Safety: We know that all of our ConnectionNode's are stored in
                // reference counted pointers.
                node.arc_from_ref()
            };
            let handle = crate::connection::api::Connection::new(handle);

            match <C::Config as endpoint::Config>::ENDPOINT_TYPE {
                endpoint::Type::Server => {
                    if let Err(error) = accept_queue.unbounded_send(handle) {
                        error.into_inner().api.close_connection(None);
                    }
                }
                endpoint::Type::Client => {
                    if let Some(mut senders) = self.waiting_for_open.remove(&id) {
                        let mut any_interest = false;
                        let last = senders.pop();
                        for sender in senders {
                            if let Err(Ok(_handle)) = sender.send(Ok(handle.clone())) {
                                // This particular handle is not interested anymore, but maybe one
                                // of the others will be.
                            } else {
                                any_interest = true;
                            }
                        }
                        if let Some(sender) = last {
                            if let Err(Ok(handle)) = sender.send(Ok(handle)) {
                                if !any_interest {
                                    // close the connection if the application is no longer waiting for the handshake
                                    handle.api.close_connection(None);
                                }
                            }
                        }
                    } else {
                        debug_assert!(false, "client connection tried to open more than once");
                    }
                }
            }
        }

        if interests.finalization != node.done_connections_link.is_linked() {
            if interests.finalization {
                if <C::Config as endpoint::Config>::ENDPOINT_TYPE.is_client() {
                    if let Some(senders) = self.waiting_for_open.remove(&id) {
                        let err = node.inner.read(|conn| conn.error());
                        let err = match err {
                            Ok(Some(err)) => {
                                // error from connection
                                err
                            }
                            Ok(None) => {
                                // connection expressed finalization without error
                                transport::Error::NO_ERROR.into()
                            }
                            Err(_err) => {
                                // error acquiring a lock
                                transport::Error::INTERNAL_ERROR
                                    .with_reason("failed to acquire connection lock")
                                    .into()
                            }
                        };
                        for sender in senders {
                            let _ = sender.send(Err(err));
                        }
                    }
                }

                if node.inner.read(|conn| !conn.is_accepted())? {
                    // Decrement the inflight handshakes because the connection has
                    // been finalized before it was handed back to the application
                    // and thus this count was not decremented previously
                    self.handshake_connections -= 1;
                }

                insert_interest!(done_connections, push_back);
            } else {
                unreachable!("Done connections should never report not done later");
            }
        }

        node.ensure_consistency();

        Ok(())
    }