in pkg/ebpf/bpf_client.go [139:288]
func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool,
enableIPv6 bool, conntrackTTL int, conntrackTableSize int) (*bpfClient, error) {
var conntrackMap goebpfmaps.BpfMap
ebpfClient := &bpfClient{
policyEndpointeBPFContext: policyEndpointeBPFContext,
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
nodeIP: nodeIP,
enableIPv6: enableIPv6,
GlobalMaps: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
AttachProbesToPodLock: new(sync.Map),
DeletePodIdentifierLock: new(sync.Map),
}
ebpfClient.logger = ctrl.Log.WithName("ebpf-client")
ingressBinary, egressBinary, eventsBinary,
cliBinary, hostMask := TC_INGRESS_BINARY, TC_EGRESS_BINARY, EVENTS_BINARY, EKS_CLI_BINARY, IPv4_HOST_MASK
if enableIPv6 {
ingressBinary, egressBinary, eventsBinary,
cliBinary, hostMask = TC_V6_INGRESS_BINARY, TC_V6_EGRESS_BINARY, EVENTS_V6_BINARY, EKS_V6_CLI_BINARY, IPv6_HOST_MASK
}
ebpfClient.ingressBinary, ebpfClient.egressBinary,
ebpfClient.hostMask = ingressBinary, egressBinary, hostMask
bpfBinaries := []string{eventsBinary, ingressBinary, egressBinary, cliBinary}
isConntrackMapPresent, isPolicyEventsMapPresent := false, false
var err error
ebpfClient.bpfSDKClient = goelf.New()
ebpfClient.bpfTCClient = tc.New([]string{POD_VETH_PREFIX, BRANCH_ENI_VETH_PREFIX})
//Set RLIMIT
err = ebpfClient.bpfSDKClient.IncreaseRlimit()
if err != nil {
//No need to error out from here. We should be good to proceed.
ebpfClient.logger.Info("Failed to increase RLIMIT on the node....but moving forward")
}
//Compare BPF binaries
ingressUpdateRequired, egressUpdateRequired, eventsUpdateRequired, err := checkAndUpdateBPFBinaries(ebpfClient.bpfTCClient,
bpfBinaries, hostBinaryPath)
if err != nil {
//Log the error and move on
ebpfClient.logger.Error(err, "Probe validation/update failed but will continue to load")
}
ebpfClient.logger.Info("Probe validation Done")
//Copy the latest binaries to /opt/cni/bin
err = cp.InstallBPFBinaries(bpfBinaries, hostBinaryPath)
if err != nil {
//Log the error and move on
ebpfClient.logger.Info("Failed to copy the eBPF binaries to host path....", "error", err)
}
ebpfClient.logger.Info("Copied eBPF binaries to the host directory")
var interfaceNametoIngressPinPath map[string]string
var interfaceNametoEgressPinPath map[string]string
eventBufferFD := 0
isConntrackMapPresent, isPolicyEventsMapPresent, eventBufferFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err = recoverBPFState(ebpfClient.bpfTCClient, ebpfClient.bpfSDKClient, policyEndpointeBPFContext,
ebpfClient.GlobalMaps, ingressUpdateRequired, egressUpdateRequired, eventsUpdateRequired)
if err != nil {
//Log the error and move on
ebpfClient.logger.Info("Failed to recover the BPF state: ", "error ", err)
sdkAPIErr.WithLabelValues("RecoverBPFState").Inc()
}
ebpfClient.logger.Info("Successfully recovered BPF state")
ebpfClient.interfaceNametoIngressPinPath = interfaceNametoIngressPinPath
ebpfClient.interfaceNametoEgressPinPath = interfaceNametoEgressPinPath
// Load the current events binary, if ..
// - Current events binary packaged with network policy agent is different than the one installed
// during the previous installation (or)
// - Either Conntrack Map (or) Events Map is currently missing on the node
if eventsUpdateRequired || (!isConntrackMapPresent || !isPolicyEventsMapPresent) {
ebpfClient.logger.Info("Install the default global maps")
eventsProbe := EVENTS_BINARY
if enableIPv6 {
eventsProbe = EVENTS_V6_BINARY
}
var bpfSdkInputData goelf.BpfCustomData
bpfSdkInputData.FilePath = eventsProbe
bpfSdkInputData.CustomPinPath = "global"
bpfSdkInputData.CustomMapSize = make(map[string]int)
bpfSdkInputData.CustomMapSize[AWS_CONNTRACK_MAP] = conntrackTableSize
ebpfClient.logger.Info("Setting conntrack cache map size: ", "max entries", conntrackTableSize)
_, globalMapInfo, err := ebpfClient.bpfSDKClient.LoadBpfFileWithCustomData(bpfSdkInputData)
if err != nil {
ebpfClient.logger.Error(err, "Unable to load events binary. Required for policy enforcement, exiting..")
sdkAPIErr.WithLabelValues("LoadBpfFileWithCustomData").Inc()
return nil, err
}
ebpfClient.logger.Info("Successfully loaded events probe")
for mapName, mapInfo := range globalMapInfo {
if mapName == AWS_CONNTRACK_MAP {
conntrackMap = mapInfo
}
if mapName == AWS_EVENTS_MAP {
eventBufferFD = int(mapInfo.MapFD)
}
}
}
if isConntrackMapPresent {
recoveredConntrackMap, ok := ebpfClient.GlobalMaps.Load(CONNTRACK_MAP_PIN_PATH)
if ok {
conntrackMap = recoveredConntrackMap.(goebpfmaps.BpfMap)
ebpfClient.logger.Info("Derived existing ConntrackMap identifier")
} else {
ebpfClient.logger.Error(err, "Unable to get conntrackMap post recovery..")
sdkAPIErr.WithLabelValues("RecoveryFailed").Inc()
return nil, err
}
}
ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6, ebpfClient.logger)
ebpfClient.logger.Info("Initialized Conntrack client")
if enablePolicyEventLogs {
err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..")
sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc()
return nil, err
}
ebpfClient.logger.Info("Configured event logging")
} else {
ebpfClient.logger.Info("Disabled event logging")
}
// Start Conntrack routines
duration := time.Duration(conntrackTTL) * time.Second
halfDuration := duration / 2
if enableIPv6 {
go wait.Forever(ebpfClient.conntrackClient.Cleanupv6ConntrackMap, halfDuration)
} else {
go wait.Forever(ebpfClient.conntrackClient.CleanupConntrackMap, halfDuration)
}
// Initializes prometheus metrics
prometheusRegister()
ebpfClient.logger.Info("BPF Client initialization done")
return ebpfClient, nil
}