in quic/s2n-quic-transport/src/connection/connection_container.rs [402:592]
fn update_interests(
&mut self,
accept_queue: &mut AcceptorSender,
node: &ConnectionNode<C, L>,
interests: ConnectionInterests,
result: ConnectionContainerIterationResult,
) -> Result<(), L::Error> {
let id = node.internal_connection_id;
// Note that all comparisons start by checking whether the connection is
// already part of the given list. This is required in order for the
// following operation to be safe. Inserting an element in a list while
// it is already part of a (different) list can panic. Trying to remove
// an element from a list while it is not actually part of the list
// is undefined.
macro_rules! insert_interest {
($list_name:ident, $call:ident) => {
let node = unsafe {
// We have to obtain an `Arc<ConnectionNode>` in order to be able to
// perform interest updates later on. However the intrusive tree
// API only provides us a raw reference.
// Safety: We know that all of our ConnectionNode's are stored in
// reference counted pointers.
node.arc_from_ref()
};
self.$list_name.$call(node);
};
}
macro_rules! remove_interest {
($list_name:ident) => {
let mut cursor = unsafe {
// Safety: We know that the node is only ever part of this list.
// While elements are in temporary lists, they always get unlinked
// from those temporary lists while their interest is updated.
self.$list_name
.cursor_mut_from_ptr(node.deref() as *const ConnectionNode<C, L>)
};
cursor.remove();
};
}
macro_rules! sync_interests_list {
($interest:expr, $link_name:ident, $list_name:ident) => {
if $interest != node.$link_name.is_linked() {
if $interest {
if matches!(result, ConnectionContainerIterationResult::Continue) {
insert_interest!($list_name, push_back);
} else {
insert_interest!($list_name, push_front);
}
} else {
remove_interest!($list_name);
}
}
debug_assert_eq!($interest, node.$link_name.is_linked());
};
}
sync_interests_list!(
interests.transmission,
waiting_for_transmission_link,
waiting_for_transmission
);
sync_interests_list!(
interests.new_connection_id,
waiting_for_connection_id_link,
waiting_for_connection_id
);
// Check if the timeout has changed since last time we queried the interests
if node.timeout.get() != interests.timeout {
// remove the connection if it's currently linked
if node.waiting_for_timeout_link.is_linked() {
remove_interest!(waiting_for_timeout);
}
// set the new timeout value
node.timeout.set(interests.timeout);
// insert the connection if it still has a value
if interests.timeout.is_some() {
insert_interest!(waiting_for_timeout, insert);
}
} else {
// make sure the timeout value reflects the connection's presence in the timeout list
debug_assert_eq!(
interests.timeout.is_some(),
node.waiting_for_timeout_link.is_linked()
);
}
// Accepted connections are only automatically pushed into the accepted connections queue.
if interests.accept {
node.inner.write(|conn| {
debug_assert!(!conn.is_handshaking());
conn.mark_as_accepted();
})?;
// Decrement the inflight handshakes because this connection completed the
// handshake and is being passed to the application to be accepted.
self.handshake_connections -= 1;
let handle = unsafe {
// We have to obtain an `Arc<ConnectionNode>` in order to be able to
// perform interest updates later on. However the intrusive tree
// API only provides us a raw reference.
// Safety: We know that all of our ConnectionNode's are stored in
// reference counted pointers.
node.arc_from_ref()
};
let handle = crate::connection::api::Connection::new(handle);
match <C::Config as endpoint::Config>::ENDPOINT_TYPE {
endpoint::Type::Server => {
if let Err(error) = accept_queue.unbounded_send(handle) {
error.into_inner().api.close_connection(None);
}
}
endpoint::Type::Client => {
if let Some(mut senders) = self.waiting_for_open.remove(&id) {
let mut any_interest = false;
let last = senders.pop();
for sender in senders {
if let Err(Ok(_handle)) = sender.send(Ok(handle.clone())) {
// This particular handle is not interested anymore, but maybe one
// of the others will be.
} else {
any_interest = true;
}
}
if let Some(sender) = last {
if let Err(Ok(handle)) = sender.send(Ok(handle)) {
if !any_interest {
// close the connection if the application is no longer waiting for the handshake
handle.api.close_connection(None);
}
}
}
} else {
debug_assert!(false, "client connection tried to open more than once");
}
}
}
}
if interests.finalization != node.done_connections_link.is_linked() {
if interests.finalization {
if <C::Config as endpoint::Config>::ENDPOINT_TYPE.is_client() {
if let Some(senders) = self.waiting_for_open.remove(&id) {
let err = node.inner.read(|conn| conn.error());
let err = match err {
Ok(Some(err)) => {
// error from connection
err
}
Ok(None) => {
// connection expressed finalization without error
transport::Error::NO_ERROR.into()
}
Err(_err) => {
// error acquiring a lock
transport::Error::INTERNAL_ERROR
.with_reason("failed to acquire connection lock")
.into()
}
};
for sender in senders {
let _ = sender.send(Err(err));
}
}
}
if node.inner.read(|conn| !conn.is_accepted())? {
// Decrement the inflight handshakes because the connection has
// been finalized before it was handed back to the application
// and thus this count was not decremented previously
self.handshake_connections -= 1;
}
insert_interest!(done_connections, push_back);
} else {
unreachable!("Done connections should never report not done later");
}
}
node.ensure_consistency();
Ok(())
}