fn test_connection_size_limit_exceeded()

in src/server.rs [692:1094]


    fn test_connection_size_limit_exceeded() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        // Test one incoming connection.
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: 51201\r\n\
                         Content-Type: application/json\r\n\r\naaaaa",
            )
            .unwrap();
        assert!(server.requests().unwrap().is_empty());
        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 265] = [0; 265];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
        let error_message = b"HTTP/1.1 400 \r\n\
                              Server: Firecracker API\r\n\
                              Connection: keep-alive\r\n\
                              Content-Type: application/json\r\n\
                              Content-Length: 149\r\n\r\n{ \"error\": \"\
                              Request payload with size 51201 is larger than \
                              the limit of 51200 allowed by server.\nAll \
                              previous unanswered requests will be dropped.";
        assert_eq!(&buf[..], &error_message[..]);
    }

    #[test]
    fn test_set_payload_size() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();
        server.set_payload_max_size(4);

        // Test one incoming connection.
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: 5\r\n\
                         Content-Type: application/json\r\n\r\naaaaa",
            )
            .unwrap();
        assert!(server.requests().unwrap().is_empty());
        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 260] = [0; 260];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
        let error_message = b"HTTP/1.1 400 \r\n\
                              Server: Firecracker API\r\n\
                              Connection: keep-alive\r\n\
                              Content-Type: application/json\r\n\
                              Content-Length: 141\r\n\r\n{ \"error\": \"\
                              Request payload with size 5 is larger than the \
                              limit of 4 allowed by server.\nAll previous \
                              unanswered requests will be dropped.\" }";
        assert_eq!(&buf[..], &error_message[..]);
    }

    #[test]
    fn test_wait_one_fd_connection() {
        use std::os::unix::io::IntoRawFd;
        let path_to_socket = get_temp_socket_file();

        let socket_listener = UnixListener::bind(path_to_socket.as_path()).unwrap();
        let socket_fd = socket_listener.into_raw_fd();

        let mut server = HttpServer::new_from_fd(socket_fd).unwrap();
        server.start_server().unwrap();

        // Test one incoming connection.
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: 13\r\n\
                         Content-Type: application/json\r\n\r\nwhatever body",
            )
            .unwrap();

        let mut req_vec = server.requests().unwrap();
        let server_request = req_vec.remove(0);

        server
            .respond(server_request.process(|request| {
                assert_eq!(
                    std::str::from_utf8(&request.body.as_ref().unwrap().body).unwrap(),
                    "whatever body"
                );
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            }))
            .unwrap();
        assert!(server.requests().unwrap().is_empty());

        let mut buf: [u8; 1024] = [0; 1024];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
        assert!(String::from_utf8_lossy(&buf).contains("response body"));
    }

    #[test]
    fn test_wait_concurrent_connections() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        // Test two concurrent connections.
        let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        first_socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                               Content-Length: 13\r\n\
                               Content-Type: application/json\r\n\r\nwhatever body",
            )
            .unwrap();
        let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();

        let mut req_vec = server.requests().unwrap();
        let server_request = req_vec.remove(0);

        server
            .respond(server_request.process(|_request| {
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            }))
            .unwrap();
        second_socket
            .write_all(
                b"GET /machine-config HTTP/1.1\r\n\
                                Content-Type: application/json\r\n\r\n",
            )
            .unwrap();

        let mut req_vec = server.requests().unwrap();
        let second_server_request = req_vec.remove(0);

        assert_eq!(
            second_server_request.request,
            Request::try_from(
                b"GET /machine-config HTTP/1.1\r\n\
            Content-Type: application/json\r\n\r\n",
                None
            )
            .unwrap()
        );

        let mut buf: [u8; 1024] = [0; 1024];
        assert!(first_socket.read(&mut buf[..]).unwrap() > 0);
        first_socket.shutdown(std::net::Shutdown::Both).unwrap();

        server
            .respond(second_server_request.process(|_request| {
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response second body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            }))
            .unwrap();

        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 1024] = [0; 1024];
        assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
        second_socket.shutdown(std::net::Shutdown::Both).unwrap();
        assert!(server.requests().unwrap().is_empty());
    }

    #[test]
    fn test_wait_expect_connection() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        // Test one incoming connection with `Expect: 100-continue`.
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: 13\r\n\
                         Expect: 100-continue\r\n\r\n",
            )
            .unwrap();
        // `wait` on server to receive what the client set on the socket.
        // This will set the stream direction to `Outgoing`, as we need to send a `100 CONTINUE` response.
        let req_vec = server.requests().unwrap();
        assert!(req_vec.is_empty());
        // Another `wait`, this time to send the response.
        // Will be called because of an `EPOLLOUT` notification.
        let req_vec = server.requests().unwrap();
        assert!(req_vec.is_empty());
        let mut buf: [u8; 1024] = [0; 1024];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);

        socket.write_all(b"whatever body").unwrap();
        let mut req_vec = server.requests().unwrap();
        let server_request = req_vec.remove(0);

        server
            .respond(server_request.process(|_request| {
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            }))
            .unwrap();

        let req_vec = server.requests().unwrap();
        assert!(req_vec.is_empty());

        let mut buf: [u8; 1024] = [0; 1024];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
    }

    #[test]
    fn test_wait_many_connections() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        let mut sockets: Vec<UnixStream> = Vec::with_capacity(MAX_CONNECTIONS + 1);
        for _ in 0..MAX_CONNECTIONS {
            sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
            assert!(server.requests().unwrap().is_empty());
        }

        sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 120] = [0; 120];
        sockets[MAX_CONNECTIONS].read_exact(&mut buf).unwrap();
        assert_eq!(&buf[..], SERVER_FULL_ERROR_MESSAGE);
        assert_eq!(server.connections.len(), 10);
        {
            // Drop this stream.
            let _refused_stream = sockets.pop().unwrap();
        }
        assert_eq!(server.connections.len(), 10);

        // Check that the server detects a connection shutdown.
        let sock: &UnixStream = sockets.get(0).unwrap();
        sock.shutdown(Shutdown::Both).unwrap();
        assert!(server.requests().unwrap().is_empty());
        // Server should drop a closed connection.
        assert_eq!(server.connections.len(), 9);

        // Close the backing FD of this connection by dropping
        // it out of scope.
        {
            // Enforce the drop call on the stream
            let _sock = sockets.pop().unwrap();
        }
        assert!(server.requests().unwrap().is_empty());
        // Server should drop a closed connection.
        assert_eq!(server.connections.len(), 8);

        let sock: &UnixStream = sockets.get(1).unwrap();
        // Close both the read and write sides of the socket
        // separately and check that the server detects it.
        sock.shutdown(Shutdown::Read).unwrap();
        sock.shutdown(Shutdown::Write).unwrap();
        assert!(server.requests().unwrap().is_empty());
        // Server should drop a closed connection.
        assert_eq!(server.connections.len(), 7);
    }

    #[test]
    fn test_wait_parse_error() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        // Test one incoming connection.
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        socket.set_nonblocking(true).unwrap();
        assert!(server.requests().unwrap().is_empty());

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: alpha\r\n\
                         Content-Type: application/json\r\n\r\nwhatever body",
            )
            .unwrap();

        assert!(server.requests().unwrap().is_empty());
        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 255] = [0; 255];
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
        let error_message = b"HTTP/1.1 400 \r\n\
                              Server: Firecracker API\r\n\
                              Connection: keep-alive\r\n\
                              Content-Type: application/json\r\n\
                              Content-Length: 136\r\n\r\n{ \"error\": \"Invalid header. \
                              Reason: Invalid value. Key:Content-Length; Value: alpha\nAll previous unanswered requests will be dropped.\" }";
        assert_eq!(&buf[..], &error_message[..]);

        socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                         Content-Length: alpha\r\n\
                         Content-Type: application/json\r\n\r\nwhatever body",
            )
            .unwrap();
    }

    #[test]
    fn test_wait_in_flight_responses() {
        let path_to_socket = get_temp_socket_file();

        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
        server.start_server().unwrap();

        // Test a connection dropped and then a new one appearing
        // before the user had a chance to send the response to the
        // first one.
        let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        assert!(server.requests().unwrap().is_empty());

        first_socket
            .write_all(
                b"PATCH /machine-config HTTP/1.1\r\n\
                               Content-Length: 13\r\n\
                               Content-Type: application/json\r\n\r\nwhatever body",
            )
            .unwrap();

        let mut req_vec = server.requests().unwrap();
        let server_request = req_vec.remove(0);

        first_socket.shutdown(std::net::Shutdown::Both).unwrap();
        assert!(server.requests().unwrap().is_empty());
        let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
        second_socket.set_nonblocking(true).unwrap();
        assert!(server.requests().unwrap().is_empty());

        server
            .enqueue_responses(vec![server_request.process(|_request| {
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            })])
            .unwrap();
        assert!(server.requests().unwrap().is_empty());
        assert_eq!(server.connections.len(), 1);
        let mut buf: [u8; 1024] = [0; 1024];
        assert!(second_socket.read(&mut buf[..]).is_err());

        second_socket
            .write_all(
                b"GET /machine-config HTTP/1.1\r\n\
                                Content-Type: application/json\r\n\r\n",
            )
            .unwrap();

        let mut req_vec = server.requests().unwrap();
        let second_server_request = req_vec.remove(0);

        assert_eq!(
            second_server_request.request,
            Request::try_from(
                b"GET /machine-config HTTP/1.1\r\n\
            Content-Type: application/json\r\n\r\n",
                None
            )
            .unwrap()
        );

        server
            .respond(second_server_request.process(|_request| {
                let mut response = Response::new(Version::Http11, StatusCode::OK);
                let response_body = b"response second body";
                response.set_body(Body::new(response_body.to_vec()));
                response
            }))
            .unwrap();

        assert!(server.requests().unwrap().is_empty());
        let mut buf: [u8; 1024] = [0; 1024];
        assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
        second_socket.shutdown(std::net::Shutdown::Both).unwrap();
        assert!(server.requests().is_ok());
    }
}