fn container_test()

in quic/s2n-quic-transport/src/connection/connection_container/tests.rs [423:614]


fn container_test() {
    use core::time::Duration;

    check!().with_type::<Vec<Operation>>().for_each(|ops| {
        let mut id_gen = InternalConnectionIdGenerator::new();
        let mut connections = vec![];
        let (handle, acceptor, connector, _close_handle) = endpoint::handle::Handle::new(100);
        let (waker, _wake_count) = futures_test::task::new_count_waker();
        let mut now = unsafe { Timestamp::from_duration(Duration::from_secs(0)) };

        let mut handle = Some(handle);
        let mut container: ConnectionContainer<TestConnection, TestLock> =
            ConnectionContainer::new(acceptor, connector);

        for op in ops.iter() {
            match op {
                Operation::Insert => {
                    let id = id_gen.generate_id();
                    let connection = TestConnection::default();
                    container.insert_connection(connection, id);
                    connections.push(id);
                }
                Operation::UpdateInterests {
                    index,
                    finalization,
                    closing,
                    accept,
                    transmission,
                    new_connection_id,
                    timeout,
                } => {
                    if connections.is_empty() {
                        continue;
                    }
                    let index = index % connections.len();
                    let id = connections[index];

                    let mut was_called = false;
                    container.with_connection(id, |conn| {
                        was_called = true;

                        let i = &mut conn.interests;

                        if *finalization {
                            // in the finalization state, that should be the only interest
                            *i = ConnectionInterests {
                                finalization: true,
                                ..Default::default()
                            };
                            return;
                        }

                        i.closing = *closing;
                        if conn.accept_state == AcceptState::HandshakeCompleted {
                            i.accept = *accept;
                        }
                        i.transmission = *transmission;
                        i.new_connection_id = *new_connection_id;
                        i.timeout = timeout.map(|ms| now + Duration::from_millis(ms as _));

                        // we need to express at least one interest to ensure progress
                        if !(i.transmission || i.new_connection_id || i.timeout.is_some()) {
                            i.transmission = true;
                        }
                    });

                    if *finalization {
                        connections.remove(index);
                    }

                    assert!(was_called);
                }
                Operation::CloseApp => {
                    handle = None;
                }
                Operation::HandshakeCompleted { index, closed } => {
                    if connections.is_empty() {
                        continue;
                    }
                    let index = index % connections.len();
                    let id = connections[index];

                    let mut was_called = false;
                    container.with_connection(id, |conn| {
                        if conn.is_handshaking() {
                            conn.accept_state = AcceptState::HandshakeCompleted;
                            if *closed {
                                // The connection was closed immediately following the
                                // handshake being completed and before it could be accepted
                                conn.is_closed = true;
                                conn.interests = ConnectionInterests {
                                    finalization: true,
                                    ..Default::default()
                                };
                                connections.remove(index);
                            } else {
                                conn.interests.accept = true;
                            }
                        }
                        was_called = true;
                    });
                    assert!(was_called);
                }
                Operation::Receive => {
                    if let Some(handle) = handle.as_mut() {
                        while let Poll::Ready(Some(_accepted)) = handle
                            .acceptor
                            .poll_accept(&mut Context::from_waker(&waker))
                        {
                            // TODO assert that the accepted connection expressed accept
                            // interest
                        }
                    }
                }
                Operation::Timeout(ms) => {
                    now += Duration::from_millis(*ms as _);
                    container.iterate_timeout_list(now, |conn, _context| {
                        let i = &mut conn.interests;

                        assert!(
                            i.timeout.take().unwrap() <= now,
                            "connections should only be present when timeout interest is expressed"
                        );

                        // we need to express at least one interest to ensure progress
                        if !(i.transmission || i.new_connection_id || i.timeout.is_some()) {
                            i.transmission = true;
                        }
                    });
                }
                Operation::Transmit(count) => {
                    let mut count = *count;
                    container.iterate_transmission_list(|conn| {
                        assert!(conn.interests.transmission);

                        if count == 0 {
                            ConnectionContainerIterationResult::BreakAndInsertAtFront
                        } else {
                            count -= 1;
                            ConnectionContainerIterationResult::Continue
                        }
                    })
                }
                Operation::NewConnId(count) => {
                    let mut count = *count;
                    container.iterate_new_connection_id_list(|conn| {
                        assert!(conn.interests.new_connection_id);

                        if count == 0 {
                            ConnectionContainerIterationResult::BreakAndInsertAtFront
                        } else {
                            count -= 1;
                            ConnectionContainerIterationResult::Continue
                        }
                    })
                }
                Operation::Finalize => {
                    container.finalize_done_connections();
                }
                Operation::Poison(index) => {
                    if connections.is_empty() {
                        continue;
                    }
                    let index = index % connections.len();
                    let id = connections[index];

                    let node = container.connection_map.find(&id).get().unwrap();
                    node.inner.poison();

                    let mut was_called = false;
                    container.with_connection(id, |_conn| {
                        was_called = true;
                    });
                    assert!(!was_called);
                    connections.remove(index);
                }
            }
        }

        container.finalize_done_connections();

        let mut connections = connections.drain(..);
        let mut cursor = container.connection_map.front();

        while let Some(conn) = cursor.get() {
            assert_eq!(Some(conn.internal_connection_id), connections.next());
            cursor.move_next();
        }

        assert!(connections.next().is_none());
    });
}