in src/dumbo/src/tcp/handler.rs [574:825]
fn test_handler() {
let mut buf = [0u8; 100];
let mut buf2 = [0u8; 2000];
let wrong_local_addr = Ipv4Addr::new(123, 123, 123, 123);
let local_addr = Ipv4Addr::new(169, 254, 169, 254);
let local_port = 80;
let remote_addr = Ipv4Addr::new(10, 0, 0, 1);
let remote_port = 1012;
let max_connections = 2;
let max_pending_resets = 2;
let mut h = TcpIPv4Handler::new(
local_addr,
local_port,
NonZeroUsize::new(max_connections).unwrap(),
NonZeroUsize::new(max_pending_resets).unwrap(),
);
// We start with a wrong destination address and destination port to check those error
// conditions first.
let mut p =
IPv4Packet::write_header(buf.as_mut(), PROTOCOL_TCP, remote_addr, wrong_local_addr)
.unwrap();
let seq_number = 123;
let s_len = {
// We're going to use this simple segment to test stuff.
let s = TcpSegment::write_segment::<[u8]>(
p.inner_mut().payload_mut(),
remote_port,
// We use the wrong port here initially, to trigger an error.
local_port + 1,
seq_number,
456,
TcpFlags::empty(),
10000,
None,
100,
None,
None,
)
.unwrap();
s.len()
};
// The handler should have nothing to send at this point.
assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(0));
let mut p = p.with_payload_len_unchecked(s_len, false);
p.set_destination_address(local_addr);
assert_eq!(
h.receive_packet(&p, mock_callback).unwrap_err(),
RecvError::InvalidPort
);
// Let's fix the port. However, the segment is not a valid SYN, so we should get an
// UnexpectedSegment status, and the handler should write a RST.
assert_eq!(h.rst_queue.len(), 0);
inner_tcp_mut(&mut p).set_destination_port(local_port);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::UnexpectedSegment)
);
assert_eq!(h.rst_queue.len(), 1);
assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
{
let s = next_written_segment(&mut h, buf2.as_mut(), WriteEvent::Nothing);
assert!(s.flags_after_ns().intersects(TcpFlags::RST));
assert_eq!(s.source_port(), local_port);
assert_eq!(s.destination_port(), remote_port);
}
assert_eq!(h.rst_queue.len(), 0);
assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
// Let's check we can only enqueue max_pending_resets resets.
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::UnexpectedSegment)
);
assert_eq!(h.rst_queue.len(), 1);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::UnexpectedSegment)
);
assert_eq!(h.rst_queue.len(), 2);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::UnexpectedSegment)
);
assert_eq!(h.rst_queue.len(), 2);
// Drain the resets.
assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(2));
assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
// Ok now let's send a valid SYN.
assert_eq!(h.connections.len(), 0);
inner_tcp_mut(&mut p).set_flags_after_ns(TcpFlags::SYN);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::NewConnectionSuccessful)
);
assert_eq!(h.connections.len(), 1);
assert_eq!(h.active_connections.len(), 1);
// Let's immediately send a RST to the newly initiated connection. This should
// terminate it.
inner_tcp_mut(&mut p)
.set_flags_after_ns(TcpFlags::RST)
.set_sequence_number(seq_number.wrapping_add(1));
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::EndpointDone)
);
assert_eq!(h.connections.len(), 0);
assert_eq!(h.active_connections.len(), 0);
// Now, let's restore the previous SYN, and resend it to initiate a connection.
inner_tcp_mut(&mut p)
.set_flags_after_ns(TcpFlags::SYN)
.set_sequence_number(seq_number);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::NewConnectionSuccessful)
);
assert_eq!(h.connections.len(), 1);
assert_eq!(h.active_connections.len(), 1);
// There will be a SYNACK in response.
assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
let remote_tuple = ConnectionTuple::new(remote_addr, remote_port);
let remote_tuple2 = ConnectionTuple::new(remote_addr, remote_port + 1);
// Also, there should be a retransmission timer associated with the previous SYNACK now.
assert_eq!(h.active_connections.len(), 0);
let old_timeout_value = if let Some((t, tuple)) = h.next_timeout {
assert_eq!(tuple, remote_tuple);
t
} else {
panic!("missing first expected timeout");
};
// Using the same SYN again will route the packet to the previous connection, and not
// create a new one.
assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
assert_eq!(h.connections.len(), 1);
// SYNACK retransmission.
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
// The timeout value should've gotten updated.
assert_eq!(h.active_connections.len(), 0);
if let Some((t, tuple)) = h.next_timeout {
assert_eq!(tuple, remote_tuple);
// The current Endpoint implementation gets timestamps using timestamp_cycles(), which
// increases VERY fast so the following inequality is guaranteed to be true. If the
// timestamp source gets coarser at some point, we might need an explicit wait before
// the previous h.receive_packet() :-s
assert!(t > old_timeout_value);
} else {
panic!("missing second expected timeout");
};
// Let's ACK the SYNACK.
{
let seq = h.connections[&remote_tuple].connection().first_not_sent().0;
inner_tcp_mut(&mut p)
.set_flags_after_ns(TcpFlags::ACK)
.set_ack_number(seq);
assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
}
// There should be no more active connections now, and also no pending timeout.
assert_eq!(h.active_connections.len(), 0);
assert_eq!(h.next_timeout, None);
// Make p a SYN packet again.
inner_tcp_mut(&mut p).set_flags_after_ns(TcpFlags::SYN);
// Create a new connection, from a different remote_port.
inner_tcp_mut(&mut p).set_source_port(remote_port + 1);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::NewConnectionSuccessful)
);
assert_eq!(h.connections.len(), 2);
assert_eq!(h.active_connections.len(), 1);
// SYNACK
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
// The timeout associated with the SYNACK of the second connection should be next.
assert_eq!(h.active_connections.len(), 0);
if let Some((_, tuple)) = h.next_timeout {
assert_ne!(tuple, ConnectionTuple::new(remote_addr, remote_port));
} else {
panic!("missing third expected timeout");
}
// No more room for another one.
{
let port = remote_port + 2;
inner_tcp_mut(&mut p).set_source_port(port);
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::NewConnectionDropped)
);
assert_eq!(h.connections.len(), 2);
// We should get a RST.
assert_eq!(h.rst_queue.len(), 1);
let s = next_written_segment(&mut h, buf2.as_mut(), WriteEvent::Nothing);
assert!(s.flags_after_ns().intersects(TcpFlags::RST));
assert_eq!(s.destination_port(), port);
}
// Let's make the second endpoint evictable.
h.connections
.get_mut(&remote_tuple2)
.unwrap()
.set_eviction_threshold(0);
// The new connection will replace the old one.
assert_eq!(
h.receive_packet(&p, mock_callback),
Ok(RecvEvent::NewConnectionReplacing)
);
assert_eq!(h.connections.len(), 2);
assert_eq!(h.active_connections.len(), 1);
// One SYNACK for the new connection, and one RST for the old one.
assert_eq!(h.rst_queue.len(), 1);
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(2));
assert_eq!(h.rst_queue.len(), 0);
assert_eq!(h.active_connections.len(), 0);
// Let's send another SYN to the first connection. This should make it reappear among the
// active connections (because it will have a RST to send), and then cause it to be removed
// altogether after sending the RST (because is_done() will be true).
inner_tcp_mut(&mut p).set_source_port(remote_port);
assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
assert_eq!(h.active_connections.len(), 1);
assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
assert_eq!(h.connections.len(), 1);
assert_eq!(h.active_connections.len(), 0);
}