in quic/s2n-quic-transport/src/connection/connection_container/tests.rs [423:614]
fn container_test() {
use core::time::Duration;
check!().with_type::<Vec<Operation>>().for_each(|ops| {
let mut id_gen = InternalConnectionIdGenerator::new();
let mut connections = vec![];
let (handle, acceptor, connector, _close_handle) = endpoint::handle::Handle::new(100);
let (waker, _wake_count) = futures_test::task::new_count_waker();
let mut now = unsafe { Timestamp::from_duration(Duration::from_secs(0)) };
let mut handle = Some(handle);
let mut container: ConnectionContainer<TestConnection, TestLock> =
ConnectionContainer::new(acceptor, connector);
for op in ops.iter() {
match op {
Operation::Insert => {
let id = id_gen.generate_id();
let connection = TestConnection::default();
container.insert_connection(connection, id);
connections.push(id);
}
Operation::UpdateInterests {
index,
finalization,
closing,
accept,
transmission,
new_connection_id,
timeout,
} => {
if connections.is_empty() {
continue;
}
let index = index % connections.len();
let id = connections[index];
let mut was_called = false;
container.with_connection(id, |conn| {
was_called = true;
let i = &mut conn.interests;
if *finalization {
// in the finalization state, that should be the only interest
*i = ConnectionInterests {
finalization: true,
..Default::default()
};
return;
}
i.closing = *closing;
if conn.accept_state == AcceptState::HandshakeCompleted {
i.accept = *accept;
}
i.transmission = *transmission;
i.new_connection_id = *new_connection_id;
i.timeout = timeout.map(|ms| now + Duration::from_millis(ms as _));
// we need to express at least one interest to ensure progress
if !(i.transmission || i.new_connection_id || i.timeout.is_some()) {
i.transmission = true;
}
});
if *finalization {
connections.remove(index);
}
assert!(was_called);
}
Operation::CloseApp => {
handle = None;
}
Operation::HandshakeCompleted { index, closed } => {
if connections.is_empty() {
continue;
}
let index = index % connections.len();
let id = connections[index];
let mut was_called = false;
container.with_connection(id, |conn| {
if conn.is_handshaking() {
conn.accept_state = AcceptState::HandshakeCompleted;
if *closed {
// The connection was closed immediately following the
// handshake being completed and before it could be accepted
conn.is_closed = true;
conn.interests = ConnectionInterests {
finalization: true,
..Default::default()
};
connections.remove(index);
} else {
conn.interests.accept = true;
}
}
was_called = true;
});
assert!(was_called);
}
Operation::Receive => {
if let Some(handle) = handle.as_mut() {
while let Poll::Ready(Some(_accepted)) = handle
.acceptor
.poll_accept(&mut Context::from_waker(&waker))
{
// TODO assert that the accepted connection expressed accept
// interest
}
}
}
Operation::Timeout(ms) => {
now += Duration::from_millis(*ms as _);
container.iterate_timeout_list(now, |conn, _context| {
let i = &mut conn.interests;
assert!(
i.timeout.take().unwrap() <= now,
"connections should only be present when timeout interest is expressed"
);
// we need to express at least one interest to ensure progress
if !(i.transmission || i.new_connection_id || i.timeout.is_some()) {
i.transmission = true;
}
});
}
Operation::Transmit(count) => {
let mut count = *count;
container.iterate_transmission_list(|conn| {
assert!(conn.interests.transmission);
if count == 0 {
ConnectionContainerIterationResult::BreakAndInsertAtFront
} else {
count -= 1;
ConnectionContainerIterationResult::Continue
}
})
}
Operation::NewConnId(count) => {
let mut count = *count;
container.iterate_new_connection_id_list(|conn| {
assert!(conn.interests.new_connection_id);
if count == 0 {
ConnectionContainerIterationResult::BreakAndInsertAtFront
} else {
count -= 1;
ConnectionContainerIterationResult::Continue
}
})
}
Operation::Finalize => {
container.finalize_done_connections();
}
Operation::Poison(index) => {
if connections.is_empty() {
continue;
}
let index = index % connections.len();
let id = connections[index];
let node = container.connection_map.find(&id).get().unwrap();
node.inner.poison();
let mut was_called = false;
container.with_connection(id, |_conn| {
was_called = true;
});
assert!(!was_called);
connections.remove(index);
}
}
}
container.finalize_done_connections();
let mut connections = connections.drain(..);
let mut cursor = container.connection_map.front();
while let Some(conn) = cursor.get() {
assert_eq!(Some(conn.internal_connection_id), connections.next());
cursor.move_next();
}
assert!(connections.next().is_none());
});
}