func capturePolicyEvents()

in pkg/ebpf/events/events.go [157:206]


func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
	nodeName := os.Getenv("MY_NODE_NAME")
	// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
	go func(ringbufferdata <-chan []byte) {
		done := false
		for record := range ringbufferdata {
			var logQueue []*cloudwatchlogs.InputLogEvent
			var message string
			if enableIPv6 {
				var rb ringBufferDataV6_t
				buf := bytes.NewBuffer(record)
				if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
					log.Info("Failed to read from Ring buf", err)
					continue
				}

				protocol := utils.GetProtocol(int(rb.Protocol))
				verdict := getVerdict(int(rb.Verdict))

				log.Info("Flow Info:  ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
					"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
					"Proto", protocol, "Verdict", verdict)

				message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
			} else {
				var rb ringBufferDataV4_t
				buf := bytes.NewBuffer(record)
				if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
					log.Info("Failed to read from Ring buf", err)
					continue
				}
				protocol := utils.GetProtocol(int(rb.Protocol))
				verdict := getVerdict(int(rb.Verdict))

				log.Info("Flow Info:  ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
					"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
					"Proto", protocol, "Verdict", verdict)

				message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
			}

			if enableCloudWatchLogs {
				done = publishDataToCloudwatch(logQueue, message, log)
				if done {
					break
				}
			}
		}
	}(ringbufferdata)
}