in nfm-controller/src/reports/report_otlp.rs [398:580]
fn build_and_save_report(compute_platform: ComputePlatform) {
let context = SockContext {
is_client: false,
address_family: AF_INET,
local_ipv4: 16909060,
remote_ipv4: 84281096,
local_ipv6: [0; 16],
remote_ipv6: [0; 16],
local_port: 443,
remote_port: 28015,
..Default::default()
};
let stats = NetworkStats {
sockets_connecting: 4,
sockets_established: 3,
sockets_closing: 2,
sockets_closed: 1,
sockets_completed: 44,
severed_connect: 2,
severed_establish: 1,
connect_attempts: 14,
bytes_received: 43,
bytes_delivered: 47,
segments_received: 53,
segments_delivered: 59,
retrans_syn: 20,
retrans_est: 19,
retrans_close: 18,
rtos_syn: 17,
rtos_est: 16,
rtos_close: 15,
connect_us: MetricHistogram {
count: 2,
min: 12,
max: 13,
sum: 25,
},
rtt_us: MetricHistogram {
count: 3,
min: 14,
max: 15,
sum: 44,
},
rtt_smoothed_us: MetricHistogram {
count: 4,
min: 16,
max: 17,
sum: 66,
},
};
let mut process_stats = ProcessStats::default();
process_stats.counters.event_related = EventCounters {
active_connect_events: 1,
active_established_events: 2,
passive_established_events: 3,
state_change_events: 4,
rtt_events: 5,
retrans_events: 6,
rto_events: 7,
other_events: 8,
socket_events: 9,
sockets_invalid: 10,
map_insertion_errors: 18,
rtts_invalid: 15,
set_flags_errors: 16,
other_errors: 17,
};
process_stats.counters.process_related = ProcessCounters::default();
process_stats.usage.push(UsageStats {
cpu_util: 0.040,
mem_used_kb: 512,
mem_used_ratio: 0.06,
sockets_tracked: 100,
});
let iface1_stats = GroupedInterfaceStats {
interface_id: "iface-id-1".to_string(),
stats: NetworkInterfaceStats {
bw_in_allowance_exceeded: 211,
bw_out_allowance_exceeded: 223,
conntrack_allowance_exceeded: 227,
linklocal_allowance_exceeded: 229,
pps_allowance_exceeded: 233,
conntrack_allowance_available: 239,
},
};
let mut iface2_stats = iface1_stats.clone();
iface2_stats.interface_id = "iface-id-2".to_string();
let host_stats = HostStats {
interface_stats: vec![iface1_stats, iface2_stats],
};
let mut flow = FlowProperties::try_from(&context).unwrap();
if compute_platform != ComputePlatform::Ec2Plain {
flow.kubernetes_metadata = Some(FlowMetadata {
local: Some(PodInfo {
name: "local-pod".to_string(),
namespace: "local-namespace".to_string(),
service_name: "local-service".to_string(),
}),
remote: Some(PodInfo {
name: "remote-pod".to_string(),
namespace: "remote-namespace".to_string(),
service_name: "remote-service".to_string(),
}),
});
}
let agg_results = AggregateResults { flow, stats };
let mut env_metadata = EnvMetadata::default();
env_metadata.insert(
"instance_id".into(),
ReportValue::String("instance-id".into()),
);
env_metadata.insert(
"machine_id".into(),
ReportValue::String("machine-id".into()),
);
env_metadata.extend(
RuntimeEnvironmentMetadataProvider::from(compute_platform.clone()).get_metadata(),
);
let mut report = NfmReport::new();
report.set_network_stats(vec![agg_results]);
report.set_process_stats(process_stats);
report.set_env_metadata(env_metadata);
report.set_host_stats(host_stats);
report.set_failed_reports(10);
report.set_service_metadata(ServiceMetadata::new("agent-service", "0.1.0", "build-time"));
match compute_platform {
ComputePlatform::Ec2K8sEks => {
report.set_k8s_metadata(K8sMetadata {
node_name: Some(ReportValue::String("k8s-node".into())),
cluster_name: Some(ReportValue::String("k8s-cluster".into())),
});
}
ComputePlatform::Ec2K8sVanilla => {
report.set_k8s_metadata(K8sMetadata {
node_name: Some(ReportValue::String("k8s-node".into())),
cluster_name: None,
});
}
ComputePlatform::Ec2Plain => {
report.set_k8s_metadata(K8sMetadata {
node_name: None,
cluster_name: None,
});
}
}
let timestamp_us = 1718716821050000;
let built_report = NfmReportOTLP::build(&report, timestamp_us).unwrap();
// Parse the OTLP request
let actual_report = ExportMetricsServiceRequest::decode(built_report.as_slice()).unwrap();
assert_eq!(built_report, actual_report.encode_to_vec());
let compute_type = match compute_platform {
ComputePlatform::Ec2Plain => "ec2",
ComputePlatform::Ec2K8sEks => "k8s-eks",
ComputePlatform::Ec2K8sVanilla => "k8s-vanilla",
};
let expected_report: ExportMetricsServiceRequest = serde_json::from_reader(
fs::File::open(format!("../test-data/report-{compute_type}-otel.json")).unwrap(),
)
.unwrap();
// Save the serialized protobuf file for external validation.
let output_filename = format!("report-1-{compute_type}-flow.bin");
let mut buf = Vec::new();
actual_report.encode(&mut buf).unwrap();
fs::create_dir_all("../target/report-samples/").unwrap();
let mut file =
fs::File::create(format!("../target/report-samples/{}", output_filename)).unwrap();
std::io::Write::write_all(&mut file, &buf).unwrap();
assert_reports_eq(actual_report, expected_report);
}