in pkg/ebpf/events/events.go [157:206]
func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
nodeName := os.Getenv("MY_NODE_NAME")
// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for record := range ringbufferdata {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := utils.GetProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))
log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := utils.GetProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))
log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}
if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
}(ringbufferdata)
}