fn test_connection()

in src/dumbo/src/tcp/connection.rs [1250:1779]


    fn test_connection() {
        // These are used to support some segments we play around with.
        let mut buf1 = [0u8; 100];
        let mut buf2 = [0u8; 100];
        let mut buf3 = [0u8; 1500];
        // Buffer containing the payload of the incoming data segment.
        let data_buf = [2u8; 1000];
        // Buffer containing the data which the connection sends on outgoing segments.
        let send_buf = [11u8; 20000];

        let mut t = ConnectionTester::new();

        let mut syn = t.write_syn(buf1.as_mut());
        let mut ctrl = t.write_ctrl(buf2.as_mut());
        let mut data = t.write_data(buf3.as_mut(), data_buf.as_ref());

        // Test creating a new connection based on invalid SYN segments.

        // Invalid flags.
        syn.set_flags_after_ns(TcpFlags::SYN | TcpFlags::ACK);
        assert_eq!(
            t.passive_open(&syn).unwrap_err(),
            PassiveOpenError::InvalidSyn
        );

        // SYN segment with payload.
        data.set_flags_after_ns(TcpFlags::SYN);
        assert_eq!(
            t.passive_open(&data).unwrap_err(),
            PassiveOpenError::InvalidSyn
        );

        // Ok, now let's test with connections created using valid SYN segments.

        // Set valid flags.
        syn.set_flags_after_ns(TcpFlags::SYN);

        let mut c = t.passive_open(&syn).unwrap();

        assert_eq!(c.ack_to_send.0, t.remote_isn.wrapping_add(1));
        assert_eq!(c.first_not_sent, c.highest_ack_received + Wrapping(1));
        assert_eq!(
            c.local_rwnd_edge.0,
            t.remote_isn.wrapping_add(1 + t.local_rwnd_size)
        );
        assert_eq!(
            c.remote_rwnd_edge,
            c.first_not_sent + Wrapping(u32::from(t.remote_window_size))
        );
        check_syn_received(&c);

        // There's a SYNACK to send.
        assert_eq!(
            c.control_segment_or_timeout_status(),
            NextSegmentStatus::Available
        );

        let mut c_clone = c.clone();

        // While the connection is in this state, we send another SYN, with a different ISN.
        syn.set_sequence_number(t.remote_isn.wrapping_add(1));
        t.should_reset_after(
            &mut c,
            &syn,
            RecvStatusFlags::INVALID_SEGMENT | RecvStatusFlags::CONN_RESETTING,
            TcpFlags::ACK,
        );

        // Let's restore the connection.
        c = c_clone;
        let mut payload_src = Some((send_buf.as_ref(), c.highest_ack_received));

        // Sending the exact same SYN again should be fine.
        syn.set_sequence_number(t.remote_isn);
        assert_eq!(
            t.receive_segment(&mut c, &syn).unwrap(),
            (None, RecvStatusFlags::empty())
        );

        // The connection should send a SYNACK at the next opportunity.
        t.check_synack_is_next(&mut c);

        // Calling write_next_segment again should not send anything else.
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // Also, we now have a RTO pending.
        assert_eq!(
            c.control_segment_or_timeout_status(),
            NextSegmentStatus::Timeout(t.rto_period)
        );

        // However, if we advance the time until just after the RTO, a SYNACK is retransmitted.
        t.now += t.rto_period;
        t.check_synack_is_next(&mut c);

        assert_eq!(
            c.control_segment_or_timeout_status(),
            NextSegmentStatus::Timeout(2 * t.rto_period)
        );

        // Re-receiving a valid SYN moves the connection back to SYN_RECEIVED.
        assert_eq!(
            t.receive_segment(&mut c, &syn).unwrap(),
            (None, RecvStatusFlags::empty())
        );
        check_syn_received(&c);

        // And thus, a SYNACK will be the next segment to be transmitted once again.
        t.check_synack_is_next(&mut c);

        // Now is a time as good as any to see what happens if we receive a RST. First, let's try
        // with an invalid RST (its sequence number is out of window).
        ctrl.set_sequence_number(c.ack_to_send.0.wrapping_sub(1))
            .set_flags_after_ns(TcpFlags::RST);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::INVALID_RST)
        );

        // Let's back up c, because the next segment will be a valid RST.
        c_clone = c.clone();
        ctrl.set_sequence_number(c.ack_to_send.0);
        t.should_reset_after(
            &mut c,
            &ctrl,
            RecvStatusFlags::RESET_RECEIVED,
            TcpFlags::ACK,
        );

        // Cool, let's restore c and continue.
        c = c_clone.clone();
        let conn_isn = c.first_not_sent.0.wrapping_sub(1);

        // Ok so we're waiting for the SYNACK to be acked. Any incoming segment which is not a
        // retransmitted SYN, but has the SYN flag set will cause a reset.
        data.set_flags_after_ns(TcpFlags::ACK | TcpFlags::SYN)
            .set_ack_number(conn_isn.wrapping_add(1))
            .set_sequence_number(t.remote_isn.wrapping_add(1));

        t.should_reset_after(
            &mut c,
            &data,
            RecvStatusFlags::INVALID_SEGMENT | RecvStatusFlags::CONN_RESETTING,
            // The RST emitted in response won't have the ACK flag set because we can infer a
            // sequence number from the ACK carried by the data segment.
            TcpFlags::empty(),
        );

        c = c_clone.clone();
        // A valid ACK should move the connection into ESTABLISHED. Also, since we allow more than
        // just pure ACKs at this point, any valid data should be received as well.
        data.set_flags_after_ns(TcpFlags::ACK);
        assert_eq!(
            t.receive_segment(&mut c, &data).unwrap(),
            (
                Some(NonZeroUsize::new(data_buf.len()).unwrap()),
                RecvStatusFlags::empty()
            )
        );
        assert!(c.is_established());

        c = c_clone.clone();
        // In fact, since we're so like whatever about the segments we receive here, let's see what
        // happens if data also carries the FIN flag (spoiler: it works).
        data.set_flags_after_ns(TcpFlags::ACK | TcpFlags::FIN);

        assert_eq!(
            t.receive_segment(&mut c, &data).unwrap(),
            (
                Some(NonZeroUsize::new(data_buf.len()).unwrap()),
                RecvStatusFlags::empty()
            )
        );
        assert!(c.is_established());
        assert!(c.fin_received());

        c = c_clone.clone();
        // That being said, let's move into established via a pure ACK.
        ctrl.set_flags_after_ns(TcpFlags::ACK)
            .set_ack_number(conn_isn.wrapping_add(1));

        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::empty())
        );
        check_established(&c);

        // Cool, let's back c up.
        c_clone = c.clone();

        // We still get spooked if we get something with a SYN.
        data.set_flags_after_ns(TcpFlags::SYN | TcpFlags::ACK);
        t.should_reset_after(
            &mut c,
            &data,
            RecvStatusFlags::INVALID_SEGMENT | RecvStatusFlags::CONN_RESETTING,
            // The RST emitted in response won't have the ACK flag set because we can infer a
            // sequence number from the ACK carried by the data segment.
            TcpFlags::empty(),
        );

        c = c_clone.clone();

        // Ok so back to ESTABLISHED, let's make sure we only accept the exact sequence
        // number we expect (which is t.remote_isn + 1 at this point). The following segment should
        // not be accepted.
        data.set_flags_after_ns(TcpFlags::ACK)
            .set_sequence_number(t.remote_isn);
        assert_eq!(
            t.receive_segment(&mut c, &data).unwrap(),
            (None, RecvStatusFlags::UNEXPECTED_SEQ)
        );

        // However, if we set the expected sequence everything should be ok.
        data.set_sequence_number(t.remote_isn + 1);
        assert_eq!(
            t.receive_segment(&mut c, &data).unwrap(),
            (
                Some(NonZeroUsize::new(data.payload_len()).unwrap()),
                RecvStatusFlags::empty()
            )
        );

        // This is the ack number that should be set/sent.
        let expected_ack = t.remote_isn.wrapping_add(data.payload_len() as u32 + 1);

        // Check that internal state gets updated properly.
        assert_eq!(c.ack_to_send.0, expected_ack);

        {
            // We should get a pure ACK here, because we don't provide a payload source.
            let s = t.write_next_segment(&mut c, None).unwrap().unwrap();
            check_acks(&s, expected_ack, TcpFlags::empty());
        }

        // Calling write_next_segment (without a payload source) again should not send
        // anything else.
        assert!(t.write_next_segment(&mut c, None).unwrap().is_none());

        {
            let payload_len = data.payload_len() as u32;

            // Assuming no one changed the code, the local window size of the connection was 10000,
            // so we should be able to successfully receive 9 more segments with 1000 byte payloads.
            let max = 9;
            for i in 1u32..=max {
                // The 1 we add is because the SYN consumes a sequence number.
                data.set_sequence_number(t.remote_isn.wrapping_add(1 + i * payload_len));
                assert_eq!(
                    t.receive_segment(&mut c, &data).unwrap(),
                    (
                        Some(NonZeroUsize::new(data.payload_len()).unwrap()),
                        RecvStatusFlags::empty()
                    )
                );
            }

            let expected_ack = t.remote_isn.wrapping_add(1 + (max + 1) * payload_len);
            // The connection should send a single cumulative ACK, and no other segment afterward
            // (if we don't also provide a payload source, which we don't).
            {
                {
                    let s = t.write_next_segment(&mut c, None).unwrap().unwrap();
                    check_acks(&s, expected_ack, TcpFlags::empty());
                }

                assert!(t.write_next_segment(&mut c, None).unwrap().is_none());
            }

            // Sending any more new data should be outside of the receive window of the connection.
            data.set_sequence_number(expected_ack);
            assert_eq!(
                t.receive_segment(&mut c, &data).unwrap(),
                (None, RecvStatusFlags::SEGMENT_BEYOND_RWND)
            );
        }

        // Restore connection state to just after ESTABLISHED, and make it send some data.
        c = c_clone.clone();

        // This should send anything, as the payload source does not contain the next sequence
        // number to be sent.

        // Should contain conn_isn + 1 to be fine, but we make it start just after.
        payload_src.as_mut().unwrap().1 = Wrapping(conn_isn) + Wrapping(2);

        assert_eq!(
            t.write_next_segment(&mut c, payload_src).unwrap_err(),
            WriteNextError::PayloadMissingSeq
        );

        // Let's fix it.
        payload_src.as_mut().unwrap().1 = Wrapping(conn_isn) + Wrapping(1);

        // The mss is 1100, and the remote window is 11000, so we can send 10 data packets.
        let max = 10;
        let remote_isn = t.remote_isn;
        let mss = u32::from(t.mss);

        let (payload_buf, mut response_seq) = payload_src.unwrap();
        let mut payload_offset = 0;
        for i in 0..max {
            // Using the expects to get the value of i if there's an error.
            let s = t
                .write_next_segment(&mut c, Some((&payload_buf[payload_offset..], response_seq)))
                .unwrap_or_else(|_| panic!("{}", i))
                .unwrap_or_else(|| panic!("{}", i));

            payload_offset += s.payload_len();
            response_seq += Wrapping(s.payload_len() as u32);

            // Again, the 1 accounts for the sequence number taken up by the SYN.
            assert_eq!(s.sequence_number(), conn_isn.wrapping_add(1 + i * mss));
            assert_eq!(s.ack_number(), remote_isn.wrapping_add(1));
            assert_eq!(s.flags_after_ns(), TcpFlags::ACK);
            assert_eq!(s.payload_len() as u32, mss);
        }

        // No more new data can be sent until the window advances, even though data_buf
        // contains 20_000 bytes.
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // Let's ACK the first segment previously sent.
        ctrl.set_ack_number(conn_isn.wrapping_add(1 + mss))
            .set_flags_after_ns(TcpFlags::ACK);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::empty())
        );

        // We should be able to send one more segment now.
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), conn_isn.wrapping_add(1 + max * mss));
            assert_eq!(s.payload_len(), mss as usize);
        }
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // We have to wait for the window to open again in order to send new data, but we can
        // have retransmissions. For example, receiving the previous ACK again will cause a
        // DUPACK, which will trigger a retransmission.
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::DUP_ACK)
        );
        assert!(c.dup_ack_pending());

        // Let's check that we indeed get a single retransmitted segment.
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), ctrl.ack_number());
            assert_eq!(s.payload_len(), mss as usize);
        }
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // Retransmissions also trigger after time-out.
        t.now += t.rto_period;
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), ctrl.ack_number());
            assert_eq!(s.payload_len(), mss as usize);
        }
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // Btw, let's also make sure another retransmission will happen after another time-out,
        // but not earlier.
        t.now += t.rto_period - 1;
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        t.now += 1;
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), ctrl.ack_number());
            assert_eq!(s.payload_len(), mss as usize);
        }
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        c_clone = c.clone();

        // Triggering another timeout should reset the connection, because t.rto_count_max == 3.
        t.now += t.rto_period;
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert!(s.flags_after_ns().intersects(TcpFlags::RST));
            assert!(c.is_reset());
        }

        // Let's undo the reset.
        t.now -= t.rto_period;
        c = c_clone;

        // Also, time-outs should stop happening if we got ACKs for all outgoing segments. This
        // ACK also closes the remote receive window so we can't send any new data.
        ctrl.set_ack_number(c.first_not_sent.0).set_window_size(0);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::empty())
        );
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());
        t.now += t.rto_period;
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        // Let's open the window a bit, to see that the next transmitted segment fits that
        // exact size.
        ctrl.set_window_size(123);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::empty())
        );
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), ctrl.ack_number());
            assert_eq!(s.payload_len(), 123);
        }
        // And let's do one more retransmission timing check.
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());
        t.now += t.rto_period - 1;
        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());
        t.now += 1;
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            assert_eq!(s.sequence_number(), ctrl.ack_number());
            assert_eq!(s.payload_len(), 123);
        }

        // This looks like a good time to check what happens for some invalid ACKs. First, let's
        // make sure we properly detect an invalid window_size advertisement (where the remote rwnd
        // edge decreases compared to previously received info).
        ctrl.set_window_size(100);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (
                None,
                RecvStatusFlags::DUP_ACK | RecvStatusFlags::REMOTE_RWND_EDGE
            )
        );
        // Let's clear the DUP_ACK related state.
        c.dup_ack = false;

        // Now let try some invalid ACKs. This one is an older ACK.
        ctrl.set_ack_number(c.highest_ack_received.0.wrapping_sub(1));
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::INVALID_ACK)
        );

        // Another example of invalid ACK is one that tries to acknowledge a sequence number yet
        // to be sent.
        ctrl.set_ack_number(c.first_not_sent.0.wrapping_add(1));
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::INVALID_ACK)
        );

        // FIN time! As usual let's begin with receiving an invalid FIN, one that does not match
        // the sequence number we expect.
        ctrl.set_flags_after_ns(TcpFlags::FIN)
            .set_sequence_number(c.ack_to_send.0.wrapping_sub(1));
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::INVALID_FIN)
        );

        // Ok now let's use a valid FIN.
        ctrl.set_sequence_number(c.ack_to_send.0);
        assert_eq!(
            t.receive_segment(&mut c, &ctrl).unwrap(),
            (None, RecvStatusFlags::empty())
        );
        check_fin_received_but_not_sent(&c);

        // The next segment right now should be a pure ACK for the FIN.
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            check_control_segment(&s, 0, TcpFlags::ACK);
            assert_eq!(s.ack_number(), ctrl.sequence_number().wrapping_add(1),);
        }

        // Receiving data after the FIN is an error. We increase the rwnd edge for c, because the
        // window was full after the earlier reception tests.
        c.advance_local_rwnd_edge(10_000);
        // We'll also get the INVALID_ACK RecvStausFlag here because the ACK number is old.
        data.set_sequence_number(c.ack_to_send.0);
        assert_eq!(
            t.receive_segment(&mut c, &data).unwrap(),
            (
                None,
                RecvStatusFlags::DATA_BEYOND_FIN | RecvStatusFlags::INVALID_ACK
            )
        );

        assert!(t.write_next_segment(&mut c, payload_src).unwrap().is_none());

        //c = c_clone.clone();

        // We change payload_src to only include those parts of send_buf that were already sent,
        // so it makes sense to close the connection as if we're done transmitting data.
        let bytes_sent_by_c = c.first_not_sent.0.wrapping_sub(conn_isn + 1) as usize;
        payload_src.as_mut().unwrap().0 = &send_buf[..bytes_sent_by_c];

        // We artifically increase the remote rwnd for c, so we can verify we sent everything, and
        // we're not just rwnd bound. We also make it so everything is ACKed, so we can sent a FIN
        // right after calling close() below (this is needed because we didn't ACK the last
        // segment sent by c).
        c.remote_rwnd_edge += Wrapping(50_000);
        c.highest_ack_received = c.first_not_sent;

        // Save the state.
        // c_clone = c.clone();

        // Close the connection.
        c.close();

        // We shouldn't be done yet. Even though we got a FIN, we didn't send our own yet.
        assert!(!c.is_done());

        // If we call write_next at this point, the next outgoing segment should be a pure FIN/ACK.
        {
            let s = t.write_next_segment(&mut c, payload_src).unwrap().unwrap();
            check_control_segment(&s, 0, TcpFlags::FIN | TcpFlags::ACK);
            assert_eq!(
                s.sequence_number(),
                conn_isn.wrapping_add(1 + bytes_sent_by_c as u32)
            );
        }

        // At this point, the connection should be done, because we both sent and received a FIN,
        // and we don't wait for our FIN to be ACKed.
        assert!(c.is_done());
    }