in nfm-controller/src/reports/publisher_endpoint.rs [306:388]
fn test_request() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut address = String::new();
let mut port = 8181;
let mut port_attempts = 5;
let mut mock_service = None;
while port_attempts > 0 {
address = format!("127.0.0.1:{}", port).to_string();
mock_service = rt.block_on(async { MockService::new(address.to_string()) });
if mock_service.is_some() {
break;
}
port_attempts -= 1;
port += 1;
}
assert!(port_attempts > 0, "Failed to find an available port");
let service_future = rt.spawn(async move { mock_service.unwrap().listen_one().await });
let creds = Credentials::new("AKID", "SECRET", Some("TOKEN".into()), None, "test");
let provider = SharedCredentialsProvider::new(creds);
let mock_clock = FakeClock {
now_us: 1718716821050,
};
let publisher = ReportPublisherOTLP::new_without_proxy(
format!("http://{}", address),
"us-west-1".to_string(),
provider,
mock_clock.clone(),
ReportCompression::None,
);
let mut report = NfmReport::new();
let context = SockContext {
is_client: false,
address_family: AF_INET as u32,
local_ipv4: 16909060,
remote_ipv4: 84281096,
local_ipv6: [0; 16],
remote_ipv6: [0; 16],
local_port: 443,
remote_port: 28015,
..Default::default()
};
let stats = NetworkStats::default();
report.set_network_stats(vec![AggregateResults {
flow: FlowProperties::try_from(&context).unwrap(),
stats,
}]);
let timestamp_ns = timespec_to_nsec(mock_clock.now());
let expected_body = NfmReportOTLP::build(&report, timestamp_ns).unwrap();
publisher.publish(&report);
let http_request_res = rt.block_on(service_future);
let mut http_request = http_request_res.unwrap();
let actual_body = http_request.pop().unwrap();
assert_eq!(actual_body.len(), expected_body.len());
assert_eq!(
ExportMetricsServiceRequest::decode(actual_body.as_slice()).unwrap(),
ExportMetricsServiceRequest::decode(expected_body.as_slice()).unwrap(),
);
// Test related to the HTTP headers.
let mut headers_set = HashSet::<String>::new();
for line in http_request
.into_iter()
.map(|vec| String::from_utf8(vec).unwrap())
{
headers_set.insert(line);
}
assert!(headers_set.contains("POST / HTTP/1.1"));
assert!(headers_set.contains("host: 127.0.0.1"));
assert!(headers_set.contains("content-type: application/x-protobuf"));
assert!(headers_set.contains("x-amz-security-token: TOKEN"));
check_header_exists("authorization", &headers_set);
}