fn test_request()

in nfm-controller/src/reports/publisher_endpoint.rs [306:388]


    fn test_request() {
        let rt = tokio::runtime::Runtime::new().unwrap();
        let mut address = String::new();
        let mut port = 8181;
        let mut port_attempts = 5;
        let mut mock_service = None;
        while port_attempts > 0 {
            address = format!("127.0.0.1:{}", port).to_string();
            mock_service = rt.block_on(async { MockService::new(address.to_string()) });
            if mock_service.is_some() {
                break;
            }
            port_attempts -= 1;
            port += 1;
        }
        assert!(port_attempts > 0, "Failed to find an available port");
        let service_future = rt.spawn(async move { mock_service.unwrap().listen_one().await });

        let creds = Credentials::new("AKID", "SECRET", Some("TOKEN".into()), None, "test");
        let provider = SharedCredentialsProvider::new(creds);

        let mock_clock = FakeClock {
            now_us: 1718716821050,
        };

        let publisher = ReportPublisherOTLP::new_without_proxy(
            format!("http://{}", address),
            "us-west-1".to_string(),
            provider,
            mock_clock.clone(),
            ReportCompression::None,
        );
        let mut report = NfmReport::new();

        let context = SockContext {
            is_client: false,
            address_family: AF_INET as u32,
            local_ipv4: 16909060,
            remote_ipv4: 84281096,
            local_ipv6: [0; 16],
            remote_ipv6: [0; 16],
            local_port: 443,
            remote_port: 28015,
            ..Default::default()
        };
        let stats = NetworkStats::default();

        report.set_network_stats(vec![AggregateResults {
            flow: FlowProperties::try_from(&context).unwrap(),
            stats,
        }]);

        let timestamp_ns = timespec_to_nsec(mock_clock.now());
        let expected_body = NfmReportOTLP::build(&report, timestamp_ns).unwrap();

        publisher.publish(&report);
        let http_request_res = rt.block_on(service_future);

        let mut http_request = http_request_res.unwrap();
        let actual_body = http_request.pop().unwrap();

        assert_eq!(actual_body.len(), expected_body.len());
        assert_eq!(
            ExportMetricsServiceRequest::decode(actual_body.as_slice()).unwrap(),
            ExportMetricsServiceRequest::decode(expected_body.as_slice()).unwrap(),
        );

        // Test related to the HTTP headers.
        let mut headers_set = HashSet::<String>::new();
        for line in http_request
            .into_iter()
            .map(|vec| String::from_utf8(vec).unwrap())
        {
            headers_set.insert(line);
        }

        assert!(headers_set.contains("POST / HTTP/1.1"));
        assert!(headers_set.contains("host: 127.0.0.1"));
        assert!(headers_set.contains("content-type: application/x-protobuf"));
        assert!(headers_set.contains("x-amz-security-token: TOKEN"));

        check_header_exists("authorization", &headers_set);
    }