pkg/ebpf/events/events.go (210 lines of code) (raw):
package events
import (
"bytes"
"encoding/binary"
"fmt"
"os"
"strconv"
"time"
"github.com/aws/aws-network-policy-agent/pkg/aws"
"github.com/aws/aws-network-policy-agent/pkg/aws/services"
"github.com/aws/aws-network-policy-agent/pkg/utils"
goebpfevents "github.com/aws/aws-ebpf-sdk-go/pkg/events"
awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/spf13/pflag"
)
var (
RING_BUFFER_PINPATH = "/sys/fs/bpf/globals/aws/maps/global_policy_events"
cwl services.CloudWatchLogs
logStreamName = ""
logGroupName = ""
sequenceToken = ""
EKS_CW_PATH = "/aws/eks/"
NON_EKS_CW_PATH = "/aws/"
)
type ringBufferDataV4_t struct {
SourceIP uint32
SourcePort uint32
DestIP uint32
DestPort uint32
Protocol uint32
Verdict uint32
}
type ringBufferDataV6_t struct {
SourceIP [16]byte
SourcePort uint32
DestIP [16]byte
DestPort uint32
Protocol uint32
Verdict uint32
}
func ConfigurePolicyEventsLogging(logger logr.Logger, enableCloudWatchLogs bool, mapFD int, enableIPv6 bool) error {
// Enable logging and setup ring buffer
if mapFD <= 0 {
logger.Info("MapFD is invalid")
return fmt.Errorf("Invalid Ringbuffer FD: %d", mapFD)
}
var mapFDList []int
mapFDList = append(mapFDList, mapFD)
eventsClient := goebpfevents.New()
eventChanList, err := eventsClient.InitRingBuffer(mapFDList)
if err != nil {
logger.Info("Failed to Initialize Ring Buffer", "err:", err)
return err
} else {
if enableCloudWatchLogs {
logger.Info("Cloudwatch log support is enabled")
err = setupCW(logger)
if err != nil {
logger.Error(err, "unable to initialize Cloudwatch Logs for Policy events")
return err
}
}
logger.Info("Configure Event loop ... ")
capturePolicyEvents(eventChanList[mapFD], logger, enableCloudWatchLogs, enableIPv6)
}
return nil
}
func setupCW(logger logr.Logger) error {
awsCloudConfig := aws.CloudConfig{}
fs := pflag.NewFlagSet("", pflag.ExitOnError)
awsCloudConfig.BindFlags(fs)
cloud, err := aws.NewCloud(awsCloudConfig)
if err != nil {
logger.Error(err, "unable to initialize AWS cloud session for Cloudwatch logs")
return err
}
cwl = cloud.CloudWatchLogs()
clusterName := cloud.ClusterName()
customlogGroupName := EKS_CW_PATH + clusterName + "/cluster"
if clusterName == utils.DEFAULT_CLUSTER_NAME {
customlogGroupName = NON_EKS_CW_PATH + clusterName + "/cluster"
}
logger.Info("Setup CW", "Setting loggroup Name", customlogGroupName)
err = ensureLogGroupExists(customlogGroupName)
if err != nil {
logger.Error(err, "unable to validate log group presence. Please check IAM permissions")
return err
}
logGroupName = customlogGroupName
return nil
}
func getVerdict(verdict int) string {
verdictStr := "DENY"
if verdict == utils.ACCEPT.Index() {
verdictStr = "ACCEPT"
} else if verdict == utils.EXPIRED_DELETED.Index() {
verdictStr = "EXPIRED/DELETED"
}
return verdictStr
}
func publishDataToCloudwatch(logQueue []*cloudwatchlogs.InputLogEvent, message string, log logr.Logger) bool {
logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{
Message: &message,
Timestamp: awssdk.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
})
if len(logQueue) > 0 {
log.Info("Sending logs to CW")
input := cloudwatchlogs.PutLogEventsInput{
LogEvents: logQueue,
LogGroupName: &logGroupName,
}
if sequenceToken == "" {
err := createLogStream()
if err != nil {
log.Info("Failed to create log stream")
panic(err)
}
} else {
input = *input.SetSequenceToken(sequenceToken)
}
input = *input.SetLogStreamName(logStreamName)
resp, err := cwl.PutLogEvents(&input)
if err != nil {
log.Info("Push log events", "Failed ", err)
} else if resp != nil && resp.NextSequenceToken != nil {
sequenceToken = *resp.NextSequenceToken
}
logQueue = []*cloudwatchlogs.InputLogEvent{}
return false
}
return true
}
func capturePolicyEvents(ringbufferdata <-chan []byte, log logr.Logger, enableCloudWatchLogs bool, enableIPv6 bool) {
nodeName := os.Getenv("MY_NODE_NAME")
// Read from ringbuffer channel, perf buffer support is not there and 5.10 kernel is needed.
go func(ringbufferdata <-chan []byte) {
done := false
for record := range ringbufferdata {
var logQueue []*cloudwatchlogs.InputLogEvent
var message string
if enableIPv6 {
var rb ringBufferDataV6_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := utils.GetProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))
log.Info("Flow Info: ", "Src IP", utils.ConvByteToIPv6(rb.SourceIP).String(), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteToIPv6(rb.DestIP).String(), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteToIPv6(rb.SourceIP).String() + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteToIPv6(rb.DestIP).String() + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
} else {
var rb ringBufferDataV4_t
buf := bytes.NewBuffer(record)
if err := binary.Read(buf, binary.LittleEndian, &rb); err != nil {
log.Info("Failed to read from Ring buf", err)
continue
}
protocol := utils.GetProtocol(int(rb.Protocol))
verdict := getVerdict(int(rb.Verdict))
log.Info("Flow Info: ", "Src IP", utils.ConvByteArrayToIP(rb.SourceIP), "Src Port", rb.SourcePort,
"Dest IP", utils.ConvByteArrayToIP(rb.DestIP), "Dest Port", rb.DestPort,
"Proto", protocol, "Verdict", verdict)
message = "Node: " + nodeName + ";" + "SIP: " + utils.ConvByteArrayToIP(rb.SourceIP) + ";" + "SPORT: " + strconv.Itoa(int(rb.SourcePort)) + ";" + "DIP: " + utils.ConvByteArrayToIP(rb.DestIP) + ";" + "DPORT: " + strconv.Itoa(int(rb.DestPort)) + ";" + "PROTOCOL: " + protocol + ";" + "PolicyVerdict: " + verdict
}
if enableCloudWatchLogs {
done = publishDataToCloudwatch(logQueue, message, log)
if done {
break
}
}
}
}(ringbufferdata)
}
func ensureLogGroupExists(name string) error {
resp, err := cwl.DescribeLogGroups(&cloudwatchlogs.DescribeLogGroupsInput{})
if err != nil {
return err
}
for _, logGroup := range resp.LogGroups {
if *logGroup.LogGroupName == name {
return nil
}
}
_, err = cwl.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: &name,
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "ResourceAlreadyExistsException" {
return nil
}
}
return err
}
return nil
}
func createLogStream() error {
name := "aws-network-policy-agent-audit-" + uuid.New().String()
_, err := cwl.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: &logGroupName,
LogStreamName: &name,
})
logStreamName = name
return err
}