pkg/ebpf/bpf_client.go (976 lines of code) (raw):

package ebpf import ( "context" "errors" "fmt" "io/ioutil" "net" "sort" "strconv" "strings" "sync" "time" "unsafe" corev1 "k8s.io/api/core/v1" "github.com/aws/amazon-vpc-cni-k8s/rpc" goelf "github.com/aws/aws-ebpf-sdk-go/pkg/elfparser" goebpfmaps "github.com/aws/aws-ebpf-sdk-go/pkg/maps" "github.com/aws/aws-ebpf-sdk-go/pkg/tc" "github.com/aws/aws-network-policy-agent/api/v1alpha1" "github.com/aws/aws-network-policy-agent/pkg/ebpf/conntrack" "github.com/aws/aws-network-policy-agent/pkg/ebpf/events" "github.com/aws/aws-network-policy-agent/pkg/rpcclient" "github.com/aws/aws-network-policy-agent/pkg/utils" "github.com/aws/aws-network-policy-agent/pkg/utils/cp" "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" "google.golang.org/protobuf/types/known/emptypb" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" ) var ( TC_INGRESS_BINARY = "tc.v4ingress.bpf.o" TC_EGRESS_BINARY = "tc.v4egress.bpf.o" TC_V6_INGRESS_BINARY = "tc.v6ingress.bpf.o" TC_V6_EGRESS_BINARY = "tc.v6egress.bpf.o" EVENTS_BINARY = "v4events.bpf.o" EVENTS_V6_BINARY = "v6events.bpf.o" TC_INGRESS_PROG = "handle_ingress" TC_EGRESS_PROG = "handle_egress" TC_INGRESS_MAP = "ingress_map" TC_EGRESS_MAP = "egress_map" TC_INGRESS_POD_STATE_MAP = "ingress_pod_state_map" TC_EGRESS_POD_STATE_MAP = "egress_pod_state_map" AWS_CONNTRACK_MAP = "aws_conntrack_map" AWS_EVENTS_MAP = "policy_events" EKS_CLI_BINARY = "aws-eks-na-cli" EKS_V6_CLI_BINARY = "aws-eks-na-cli-v6" hostBinaryPath = "/host/opt/cni/bin/" IPv4_HOST_MASK = "/32" IPv6_HOST_MASK = "/128" CONNTRACK_MAP_PIN_PATH = "/sys/fs/bpf/globals/aws/maps/global_aws_conntrack_map" POLICY_EVENTS_MAP_PIN_PATH = "/sys/fs/bpf/globals/aws/maps/global_policy_events" CATCH_ALL_PROTOCOL corev1.Protocol = "ANY_IP_PROTOCOL" POD_VETH_PREFIX = "eni" POLICIES_APPLIED = 0 DEFAULT_ALLOW = 1 DEFAULT_DENY = 2 LOCAL_IPAMD_ADDRESS = "127.0.0.1:50051" POD_STATE_MAP_KEY = 0 BRANCH_ENI_VETH_PREFIX = "vlan" ) var ( sdkAPILatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "awsnodeagent_aws_ebpf_sdk_latency_ms", Help: "eBPF SDK API call latency in ms", }, []string{"api", "error"}, ) sdkAPIErr = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "awsnodeagent_aws_ebpfsdk_error_count", Help: "The number of times eBPF SDK returns an error", }, []string{"fn"}, ) prometheusRegistered = false ) type pod_state struct { state uint8 } func msSince(start time.Time) float64 { return float64(time.Since(start) / time.Millisecond) } func prometheusRegister() { if !prometheusRegistered { prometheus.MustRegister(sdkAPILatency) prometheus.MustRegister(sdkAPIErr) prometheusRegistered = true } } type BpfClient interface { AttacheBPFProbes(pod types.NamespacedName, policyEndpoint string) error UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error UpdatePodStateEbpfMaps(podIdentifier string, state int, updateIngress bool, updateEgress bool) error IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) IsFirstPodInPodIdentifier(podIdentifier string) bool GetIngressPodToProgMap() *sync.Map GetEgressPodToProgMap() *sync.Map GetIngressProgToPodsMap() *sync.Map GetEgressProgToPodsMap() *sync.Map DeletePodFromIngressProgPodCaches(podName string, podNamespace string) DeletePodFromEgressProgPodCaches(podName string, podNamespace string) ReAttachEbpfProbes() error DeleteBPFProgramAndMaps(podIdentifier string) error GetDeletePodIdentifierLockMap() *sync.Map } type EvProgram struct { wg sync.WaitGroup } type BPFContext struct { ingressPgmInfo goelf.BpfData egressPgmInfo goelf.BpfData conntrackMapInfo goebpfmaps.BpfMap } type EbpfFirewallRules struct { IPCidr v1alpha1.NetworkAddress Except []v1alpha1.NetworkAddress L4Info []v1alpha1.Port } func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, conntrackTTL int, conntrackTableSize int) (*bpfClient, error) { var conntrackMap goebpfmaps.BpfMap ebpfClient := &bpfClient{ policyEndpointeBPFContext: policyEndpointeBPFContext, IngressPodToProgMap: new(sync.Map), EgressPodToProgMap: new(sync.Map), nodeIP: nodeIP, enableIPv6: enableIPv6, GlobalMaps: new(sync.Map), IngressProgToPodsMap: new(sync.Map), EgressProgToPodsMap: new(sync.Map), AttachProbesToPodLock: new(sync.Map), DeletePodIdentifierLock: new(sync.Map), } ebpfClient.logger = ctrl.Log.WithName("ebpf-client") ingressBinary, egressBinary, eventsBinary, cliBinary, hostMask := TC_INGRESS_BINARY, TC_EGRESS_BINARY, EVENTS_BINARY, EKS_CLI_BINARY, IPv4_HOST_MASK if enableIPv6 { ingressBinary, egressBinary, eventsBinary, cliBinary, hostMask = TC_V6_INGRESS_BINARY, TC_V6_EGRESS_BINARY, EVENTS_V6_BINARY, EKS_V6_CLI_BINARY, IPv6_HOST_MASK } ebpfClient.ingressBinary, ebpfClient.egressBinary, ebpfClient.hostMask = ingressBinary, egressBinary, hostMask bpfBinaries := []string{eventsBinary, ingressBinary, egressBinary, cliBinary} isConntrackMapPresent, isPolicyEventsMapPresent := false, false var err error ebpfClient.bpfSDKClient = goelf.New() ebpfClient.bpfTCClient = tc.New([]string{POD_VETH_PREFIX, BRANCH_ENI_VETH_PREFIX}) //Set RLIMIT err = ebpfClient.bpfSDKClient.IncreaseRlimit() if err != nil { //No need to error out from here. We should be good to proceed. ebpfClient.logger.Info("Failed to increase RLIMIT on the node....but moving forward") } //Compare BPF binaries ingressUpdateRequired, egressUpdateRequired, eventsUpdateRequired, err := checkAndUpdateBPFBinaries(ebpfClient.bpfTCClient, bpfBinaries, hostBinaryPath) if err != nil { //Log the error and move on ebpfClient.logger.Error(err, "Probe validation/update failed but will continue to load") } ebpfClient.logger.Info("Probe validation Done") //Copy the latest binaries to /opt/cni/bin err = cp.InstallBPFBinaries(bpfBinaries, hostBinaryPath) if err != nil { //Log the error and move on ebpfClient.logger.Info("Failed to copy the eBPF binaries to host path....", "error", err) } ebpfClient.logger.Info("Copied eBPF binaries to the host directory") var interfaceNametoIngressPinPath map[string]string var interfaceNametoEgressPinPath map[string]string eventBufferFD := 0 isConntrackMapPresent, isPolicyEventsMapPresent, eventBufferFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err = recoverBPFState(ebpfClient.bpfTCClient, ebpfClient.bpfSDKClient, policyEndpointeBPFContext, ebpfClient.GlobalMaps, ingressUpdateRequired, egressUpdateRequired, eventsUpdateRequired) if err != nil { //Log the error and move on ebpfClient.logger.Info("Failed to recover the BPF state: ", "error ", err) sdkAPIErr.WithLabelValues("RecoverBPFState").Inc() } ebpfClient.logger.Info("Successfully recovered BPF state") ebpfClient.interfaceNametoIngressPinPath = interfaceNametoIngressPinPath ebpfClient.interfaceNametoEgressPinPath = interfaceNametoEgressPinPath // Load the current events binary, if .. // - Current events binary packaged with network policy agent is different than the one installed // during the previous installation (or) // - Either Conntrack Map (or) Events Map is currently missing on the node if eventsUpdateRequired || (!isConntrackMapPresent || !isPolicyEventsMapPresent) { ebpfClient.logger.Info("Install the default global maps") eventsProbe := EVENTS_BINARY if enableIPv6 { eventsProbe = EVENTS_V6_BINARY } var bpfSdkInputData goelf.BpfCustomData bpfSdkInputData.FilePath = eventsProbe bpfSdkInputData.CustomPinPath = "global" bpfSdkInputData.CustomMapSize = make(map[string]int) bpfSdkInputData.CustomMapSize[AWS_CONNTRACK_MAP] = conntrackTableSize ebpfClient.logger.Info("Setting conntrack cache map size: ", "max entries", conntrackTableSize) _, globalMapInfo, err := ebpfClient.bpfSDKClient.LoadBpfFileWithCustomData(bpfSdkInputData) if err != nil { ebpfClient.logger.Error(err, "Unable to load events binary. Required for policy enforcement, exiting..") sdkAPIErr.WithLabelValues("LoadBpfFileWithCustomData").Inc() return nil, err } ebpfClient.logger.Info("Successfully loaded events probe") for mapName, mapInfo := range globalMapInfo { if mapName == AWS_CONNTRACK_MAP { conntrackMap = mapInfo } if mapName == AWS_EVENTS_MAP { eventBufferFD = int(mapInfo.MapFD) } } } if isConntrackMapPresent { recoveredConntrackMap, ok := ebpfClient.GlobalMaps.Load(CONNTRACK_MAP_PIN_PATH) if ok { conntrackMap = recoveredConntrackMap.(goebpfmaps.BpfMap) ebpfClient.logger.Info("Derived existing ConntrackMap identifier") } else { ebpfClient.logger.Error(err, "Unable to get conntrackMap post recovery..") sdkAPIErr.WithLabelValues("RecoveryFailed").Inc() return nil, err } } ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6, ebpfClient.logger) ebpfClient.logger.Info("Initialized Conntrack client") if enablePolicyEventLogs { err = events.ConfigurePolicyEventsLogging(ebpfClient.logger, enableCloudWatchLogs, eventBufferFD, enableIPv6) if err != nil { ebpfClient.logger.Error(err, "unable to initialize event buffer for Policy events, exiting..") sdkAPIErr.WithLabelValues("ConfigurePolicyEventsLogging").Inc() return nil, err } ebpfClient.logger.Info("Configured event logging") } else { ebpfClient.logger.Info("Disabled event logging") } // Start Conntrack routines duration := time.Duration(conntrackTTL) * time.Second halfDuration := duration / 2 if enableIPv6 { go wait.Forever(ebpfClient.conntrackClient.Cleanupv6ConntrackMap, halfDuration) } else { go wait.Forever(ebpfClient.conntrackClient.CleanupConntrackMap, halfDuration) } // Initializes prometheus metrics prometheusRegister() ebpfClient.logger.Info("BPF Client initialization done") return ebpfClient, nil } var _ BpfClient = (*bpfClient)(nil) type bpfClient struct { // Stores eBPF Ingress and Egress context per policyEndpoint resource policyEndpointeBPFContext *sync.Map // Stores the Ingress eBPF Prog FD per pod IngressPodToProgMap *sync.Map // Stores the Egress eBPF Prog FD per pod EgressPodToProgMap *sync.Map // Stores info on the global maps the agent creates GlobalMaps *sync.Map // Primary IP of the node nodeIP string // Flag to track the IPv6 mode enableIPv6 bool // Ingress eBPF probe binary ingressBinary string // Egress eBPF probe binary egressBinary string // host IP Mask - will be initialized based on the IP family hostMask string // Conntrack client instance conntrackClient conntrack.ConntrackClient // eBPF SDK Client bpfSDKClient goelf.BpfSDKClient // eBPF TC Client bpfTCClient tc.BpfTc // Logger instance logger logr.Logger // Stores the Ingress eBPF Prog FD to pods mapping IngressProgToPodsMap *sync.Map // Stores the Egress eBPF Prog FD to pods mapping EgressProgToPodsMap *sync.Map // Stores podIdentifier to attachprobes lock mapping AttachProbesToPodLock *sync.Map // This is only updated and used for probe binary updates during initialization interfaceNametoIngressPinPath map[string]string // This is only updated and used for probe binary updates during initialization interfaceNametoEgressPinPath map[string]string // Stores podIdentifier to deletepod lock mapping DeletePodIdentifierLock *sync.Map } func checkAndUpdateBPFBinaries(bpfTCClient tc.BpfTc, bpfBinaries []string, hostBinaryPath string) (bool, bool, bool, error) { log := ctrl.Log.WithName("ebpf-client-init") //TODO - reuse the logger updateIngressProbe, updateEgressProbe, updateEventsProbe := false, false, false var existingProbePath string for _, bpfProbe := range bpfBinaries { if bpfProbe == EKS_CLI_BINARY || bpfProbe == EKS_V6_CLI_BINARY { continue } log.Info("Validating ", "Probe: ", bpfProbe) currentProbe, err := ioutil.ReadFile(bpfProbe) if err != nil { log.Info("error opening ", "Probe: ", bpfProbe, "error", err) } existingProbePath = hostBinaryPath + bpfProbe existingProbe, err := ioutil.ReadFile(existingProbePath) if err != nil { log.Info("error opening ", "Probe: ", existingProbePath, "error", err) } log.Info("comparing new and existing probes ...") isEqual := cmp.Equal(currentProbe, existingProbe) if !isEqual { if bpfProbe == EVENTS_BINARY || bpfProbe == EVENTS_V6_BINARY { // Ingress and Egress probes refer to Conntrack and Policy Events maps defined in // events binary. So, if the events binary changes, we will need to update all the existing // probes in the local node updateEventsProbe, updateIngressProbe, updateEgressProbe = true, true, true log.Info("change detected in event probe binaries..") break } if bpfProbe == TC_INGRESS_BINARY || bpfProbe == TC_V6_INGRESS_BINARY { log.Info("change detected in ingress probe binaries.. ") updateIngressProbe = true } if bpfProbe == TC_EGRESS_BINARY || bpfProbe == TC_V6_EGRESS_BINARY { log.Info("change detected in egress probe binaries..") updateEgressProbe = true } } } return updateIngressProbe, updateEgressProbe, updateEventsProbe, nil } func recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.BpfSDKClient, policyEndpointeBPFContext *sync.Map, globalMaps *sync.Map, updateIngressProbe, updateEgressProbe, updateEventsProbe bool) (bool, bool, int, map[string]string, map[string]string, error) { log := ctrl.Log.WithName("ebpf-client") //TODO reuse logger isConntrackMapPresent, isPolicyEventsMapPresent := false, false eventsMapFD := 0 var interfaceNametoIngressPinPath = make(map[string]string) var interfaceNametoEgressPinPath = make(map[string]string) // Recover global maps (Conntrack and Events) if there is no need to update // events binary if !updateEventsProbe { recoveredGlobalMaps, err := eBPFSDKClient.RecoverGlobalMaps() if err != nil { log.Error(err, "failed to recover global maps..") sdkAPIErr.WithLabelValues("RecoverGlobalMaps").Inc() return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil } log.Info("Total no.of global maps recovered...", "count: ", len(recoveredGlobalMaps)) for globalMapName, globalMap := range recoveredGlobalMaps { log.Info("Global Map..", "Name: ", globalMapName, "updateEventsProbe: ", updateEventsProbe) if globalMapName == CONNTRACK_MAP_PIN_PATH { log.Info("Conntrack Map is already present on the node") isConntrackMapPresent = true globalMaps.Store(globalMapName, globalMap) } if globalMapName == POLICY_EVENTS_MAP_PIN_PATH { isPolicyEventsMapPresent = true eventsMapFD = int(globalMap.MapFD) log.Info("Policy event Map is already present on the node ", "Recovered FD", eventsMapFD) } } } // If no updates required to probes, Recover BPF Programs and Maps from BPF_FS. We only aim to recover programs and maps // created by aws-network-policy-agent (Located under /sys/fs/bpf/globals/aws) if !updateIngressProbe || !updateEgressProbe { bpfState, err := eBPFSDKClient.RecoverAllBpfProgramsAndMaps() var peBPFContext BPFContext if err != nil { //Log it and move on. We will overwrite and recreate the maps/programs log.Info("BPF State Recovery failed: ", "error: ", err) sdkAPIErr.WithLabelValues("RecoverAllBpfProgramAndMaps").Inc() } log.Info("Number of probes/maps recovered - ", "count: ", len(bpfState)) for pinPath, bpfEntry := range bpfState { log.Info("Recovered program Identifier: ", "Pin Path: ", pinPath) podIdentifier, direction := utils.GetPodIdentifierFromBPFPinPath(pinPath) log.Info("PinPath: ", "podIdentifier: ", podIdentifier, "direction: ", direction) value, ok := policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext = value.(BPFContext) } if direction == "ingress" && !updateIngressProbe { peBPFContext.ingressPgmInfo = bpfEntry } else if direction == "egress" && !updateEgressProbe { peBPFContext.egressPgmInfo = bpfEntry } policyEndpointeBPFContext.Store(podIdentifier, peBPFContext) } } //If update required, cleanup probes and gather data to re attach probes with new programs if updateIngressProbe || updateEgressProbe { // Get all loaded programs and maps bpfState, err := eBPFSDKClient.GetAllBpfProgramsAndMaps() if err != nil { log.Info("GetAllBpfProgramsAndMaps failed: ", "error: ", err) sdkAPIErr.WithLabelValues("GetAllBpfProgramsAndMaps").Inc() return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err } log.Info("GetAllBpfProgramsAndMaps ", "returned", len(bpfState)) progIdToPinPath := make(map[int]string) for pinPath, bpfData := range bpfState { progId := bpfData.Program.ProgID if progId > 0 { progIdToPinPath[progId] = pinPath } } // Get attached progIds interfaceToIngressProgIds, interfaceToEgressProgIds, err := bpfTCClient.GetAllAttachedProgIds() log.Info("Got attached ", "ingressprogIds ", len(interfaceToIngressProgIds), " egressprogIds ", len(interfaceToEgressProgIds)) //cleanup all existing filters cleanupErr := bpfTCClient.CleanupQdiscs(updateIngressProbe, updateEgressProbe) if cleanupErr != nil { // log the error and continue. Attaching new probes will cleanup the old ones log.Info("Probe cleanup failed ", "error: ", cleanupErr) sdkAPIErr.WithLabelValues("CleanupQdiscs").Inc() } for interfaceName, existingIngressProgId := range interfaceToIngressProgIds { pinPath, ok := progIdToPinPath[existingIngressProgId] if ok && updateIngressProbe { interfaceNametoIngressPinPath[interfaceName] = pinPath } } for interfaceName, existingEgressProgId := range interfaceToEgressProgIds { pinPath, ok := progIdToPinPath[existingEgressProgId] if ok && updateEgressProbe { interfaceNametoEgressPinPath[interfaceName] = pinPath } } log.Info("Collected all data for reattaching probes") } return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil } func (l *bpfClient) ReAttachEbpfProbes() error { var networkPolicyMode string var err error // If we have any links for which we need to reattach the probes, fetch NP mode from ipamd if len(l.interfaceNametoIngressPinPath) > 0 || len(l.interfaceNametoEgressPinPath) > 0 { // get network policy mode from ipamd networkPolicyMode, err = l.GetNetworkPolicyModeFromIpamd() if err != nil { l.logger.Info("Error while fetching networkPolicyMode from ipamd ", err) return err } } state := DEFAULT_ALLOW if utils.IsStrictMode(networkPolicyMode) { state = DEFAULT_DENY } for interfaceName, pinPath := range l.interfaceNametoIngressPinPath { podIdentifier, _ := utils.GetPodIdentifierFromBPFPinPath(pinPath) l.logger.Info("ReattachEbpfProbes ", "attaching ingress for ", podIdentifier, "interface ", interfaceName) _, err := l.attachIngressBPFProbe(interfaceName, podIdentifier) if err != nil { l.logger.Info("Failed to Attach Ingress TC probe for", "interface: ", interfaceName, " podidentifier", podIdentifier) sdkAPIErr.WithLabelValues("attachIngressBPFProbe").Inc() } l.logger.Info("Updating ingress_pod_state map for ", "podIdentifier: ", podIdentifier, "networkPolicyMode: ", networkPolicyMode) err = l.UpdatePodStateEbpfMaps(podIdentifier, state, true, false) if err != nil { l.logger.Info("Map update(s) failed for, ", "podIdentifier ", podIdentifier, "error: ", err) } } for interfaceName, pinPath := range l.interfaceNametoEgressPinPath { podIdentifier, _ := utils.GetPodIdentifierFromBPFPinPath(pinPath) l.logger.Info("ReattachEbpfProbes ", "attaching egress for ", podIdentifier, "interface ", interfaceName) _, err := l.attachEgressBPFProbe(interfaceName, podIdentifier) if err != nil { l.logger.Info("Failed to Attach Egress TC probe for", "interface: ", interfaceName, " podidentifier", podIdentifier) sdkAPIErr.WithLabelValues("attachEgressBPFProbe").Inc() } l.logger.Info("Updating egress_pod_state map for ", "podIdentifier: ", podIdentifier, "networkPolicyMode: ", networkPolicyMode) err = l.UpdatePodStateEbpfMaps(podIdentifier, state, false, true) if err != nil { l.logger.Info("Map update(s) failed for, ", "podIdentifier ", podIdentifier, "error: ", err) } } return nil } func (l *bpfClient) GetNetworkPolicyModeFromIpamd() (string, error) { ctx := context.Background() grpcLogger := ctrl.Log.WithName("grpcLogger") // grpc connection waits till the ipmad is up and running grpcLogger.Info("Trying to establish GRPC connection to ipamd") grpcConn, err := rpcclient.New().Dial(ctx, LOCAL_IPAMD_ADDRESS, rpcclient.GetDefaultServiceRetryConfig(), rpcclient.GetInsecureConnectionType()) if err != nil { grpcLogger.Error(err, "Failed to connect to ipamd") return "", err } defer grpcConn.Close() ipamd := rpc.NewConfigServerBackendClient(grpcConn) resp, err := ipamd.GetNetworkPolicyConfigs(ctx, &emptypb.Empty{}) if err != nil { grpcLogger.Error(err, "Failed to get network policy configs") return "", err } grpcLogger.Info("Connected to ipamd grpc endpoint and got response for ", "NetworkPolicyMode", resp.NetworkPolicyMode) if !utils.IsValidNetworkPolicyEnforcingMode(resp.NetworkPolicyMode) { err = errors.New("Invalid Network Policy Mode") grpcLogger.Error(err, "Invalid Network Policy Mode ", resp.NetworkPolicyMode) return "", err } return resp.NetworkPolicyMode, nil } func (l *bpfClient) GetIngressPodToProgMap() *sync.Map { return l.IngressPodToProgMap } func (l *bpfClient) GetEgressPodToProgMap() *sync.Map { return l.EgressPodToProgMap } func (l *bpfClient) GetIngressProgToPodsMap() *sync.Map { return l.IngressProgToPodsMap } func (l *bpfClient) GetEgressProgToPodsMap() *sync.Map { return l.EgressProgToPodsMap } func (l *bpfClient) GetDeletePodIdentifierLockMap() *sync.Map { return l.DeletePodIdentifierLock } func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string) error { // Two go routines can try to attach the probes at the same time // Locking will help updating all the datastructures correctly value, _ := l.AttachProbesToPodLock.LoadOrStore(podIdentifier, &sync.Mutex{}) attachProbesLock := value.(*sync.Mutex) attachProbesLock.Lock() l.logger.Info("Got the attachProbesLock for", "Pod: ", pod.Name, " Namespace: ", pod.Namespace, " PodIdentifier: ", podIdentifier) defer attachProbesLock.Unlock() // Check if an eBPF probe is already attached on both ingress and egress direction(s) for this pod. // If yes, then skip probe attach flow for this pod and update the relevant map entries. isIngressProbeAttached, isEgressProbeAttached := l.IsEBPFProbeAttached(pod.Name, pod.Namespace) start := time.Now() // We attach the TC probes to the hostVeth interface of the pod. Derive the hostVeth // name from the Name and Namespace of the Pod. // Note: The below naming convention is tied to VPC CNI and isn't meant to be generic hostVethName := utils.GetHostVethName(pod.Name, pod.Namespace, []string{POD_VETH_PREFIX, BRANCH_ENI_VETH_PREFIX}, l.logger) l.logger.Info("AttacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName) podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace) if !isIngressProbeAttached { progFD, err := l.attachIngressBPFProbe(hostVethName, podIdentifier) duration := msSince(start) sdkAPILatency.WithLabelValues("attachIngressBPFProbe", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Failed to Attach Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) sdkAPIErr.WithLabelValues("attachIngressBPFProbe").Inc() return err } l.logger.Info("Successfully attached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) l.IngressPodToProgMap.Store(podNamespacedName, progFD) currentPodSet, _ := l.IngressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{})) currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{} } if !isEgressProbeAttached { progFD, err := l.attachEgressBPFProbe(hostVethName, podIdentifier) duration := msSince(start) sdkAPILatency.WithLabelValues("attachEgressBPFProbe", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Failed to Attach Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) sdkAPIErr.WithLabelValues("attachEgressBPFProbe").Inc() return err } l.logger.Info("Successfully attached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) l.EgressPodToProgMap.Store(podNamespacedName, progFD) currentPodSet, _ := l.EgressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{})) currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{} } return nil } func (l *bpfClient) attachIngressBPFProbe(hostVethName string, podIdentifier string) (int, error) { // We will re-use the same eBPF program instance for pods belonging to same replicaset // Check if we've already loaded an ELF file for this PolicyEndpoint resource and re-use // if present, otherwise load a new instance and attach it var progFD int var err error var ingressProgInfo map[string]goelf.BpfData var peBPFContext BPFContext value, ok := l.policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext = value.(BPFContext) } if peBPFContext.ingressPgmInfo.Program.ProgFD != 0 { l.logger.Info("Found an existing instance, let's derive the ingress context..") ingressEbpfProgEntry := peBPFContext.ingressPgmInfo progFD = ingressEbpfProgEntry.Program.ProgFD } else { ingressProgInfo, progFD, err = l.loadBPFProgram(l.ingressBinary, "ingress", podIdentifier) pinPath := utils.GetBPFPinPathFromPodIdentifier(podIdentifier, "ingress") peBPFContext.ingressPgmInfo = ingressProgInfo[pinPath] l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext) } l.logger.Info("Attempting to do an Ingress Attach ", "with progFD: ", progFD) err = l.bpfTCClient.TCEgressAttach(hostVethName, progFD, TC_INGRESS_PROG) if err != nil && !utils.IsFileExistsError(err.Error()) { l.logger.Info("Ingress Attach failed:", "error", err) return 0, err } return progFD, nil } func (l *bpfClient) attachEgressBPFProbe(hostVethName string, podIdentifier string) (int, error) { // We will re-use the same eBPF program instance for pods belonging to same replicaset // Check if we've already loaded an ELF file for this PolicyEndpoint resource and re-use // if present, otherwise load a new instance and attach it var progFD int var err error var egressProgInfo map[string]goelf.BpfData var peBPFContext BPFContext value, ok := l.policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext = value.(BPFContext) } if peBPFContext.egressPgmInfo.Program.ProgFD != 0 { l.logger.Info("Found an existing instance, let's derive the egress context..") egressEbpfProgEntry := peBPFContext.egressPgmInfo progFD = egressEbpfProgEntry.Program.ProgFD } else { egressProgInfo, progFD, err = l.loadBPFProgram(l.egressBinary, "egress", podIdentifier) pinPath := utils.GetBPFPinPathFromPodIdentifier(podIdentifier, "egress") peBPFContext.egressPgmInfo = egressProgInfo[pinPath] l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext) } l.logger.Info("Attempting to do an Egress Attach ", "with progFD: ", progFD) err = l.bpfTCClient.TCIngressAttach(hostVethName, progFD, TC_EGRESS_PROG) if err != nil && !utils.IsFileExistsError(err.Error()) { l.logger.Error(err, "Egress Attach failed") return 0, err } return progFD, nil } func (l *bpfClient) DeleteBPFProgramAndMaps(podIdentifier string) error { start := time.Now() err := l.deleteBPFProgramAndMaps(podIdentifier, "ingress") duration := msSince(start) sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier) sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc() } start = time.Now() err = l.deleteBPFProgramAndMaps(podIdentifier, "egress") duration = msSince(start) sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Error while deleting Egress BPF Probe for ", "podIdentifier: ", podIdentifier) sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc() } l.policyEndpointeBPFContext.Delete(podIdentifier) if _, ok := l.AttachProbesToPodLock.Load(podIdentifier); ok { l.AttachProbesToPodLock.Delete(podIdentifier) } return nil } func (l *bpfClient) deleteBPFProgramAndMaps(podIdentifier string, direction string) error { var err error var peBPFContext BPFContext value, ok := l.policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext = value.(BPFContext) } pgmPinPath := utils.GetBPFPinPathFromPodIdentifier(podIdentifier, direction) mapPinpath := utils.GetBPFMapPinPathFromPodIdentifier(podIdentifier, direction) podStateMapPinPath := utils.GetPodStateBPFMapPinPathFromPodIdentifier(podIdentifier, direction) l.logger.Info("Deleting: ", "Program: ", pgmPinPath, "Map: ", mapPinpath, "Map: ", podStateMapPinPath) pgmInfo := peBPFContext.ingressPgmInfo mapToDelete := pgmInfo.Maps[TC_INGRESS_MAP] podStateMapToDelete := pgmInfo.Maps[TC_INGRESS_POD_STATE_MAP] if direction == "egress" { pgmInfo = peBPFContext.egressPgmInfo mapToDelete = pgmInfo.Maps[TC_EGRESS_MAP] podStateMapToDelete = pgmInfo.Maps[TC_EGRESS_POD_STATE_MAP] } l.logger.Info("Get storedFD ", "progFD: ", pgmInfo.Program.ProgFD) if pgmInfo.Program.ProgFD != 0 { l.logger.Info("Found the Program and Map to delete - ", "Program: ", pgmPinPath, "Map: ", mapPinpath, "Map: ", podStateMapPinPath) err = pgmInfo.Program.UnPinProg(pgmPinPath) if err != nil { l.logger.Info("Failed to delete the Program: ", err) } err = mapToDelete.UnPinMap(mapPinpath) if err != nil { l.logger.Info("Failed to delete the Map: ", err) } err = podStateMapToDelete.UnPinMap(podStateMapPinPath) if err != nil { l.logger.Info("Failed to delete PodState Map: ", err) } } return nil } func (l *bpfClient) loadBPFProgram(fileName string, direction string, podIdentifier string) (map[string]goelf.BpfData, int, error) { start := time.Now() l.logger.Info("Load the eBPF program") // Load a new instance of the program progInfo, _, err := l.bpfSDKClient.LoadBpfFile(fileName, podIdentifier) duration := msSince(start) sdkAPILatency.WithLabelValues("LoadBpfFile", fmt.Sprint(err != nil)).Observe(duration) if err != nil { sdkAPIErr.WithLabelValues("LoadBpfFile").Inc() l.logger.Info("Load BPF failed", "err:", err) return nil, -1, err } for k, _ := range progInfo { l.logger.Info("Prog Info: ", "Pin Path: ", k) } pinPath := utils.GetBPFPinPathFromPodIdentifier(podIdentifier, direction) l.logger.Info("PinPath for this pod: ", "PinPath: ", pinPath) progFD := progInfo[pinPath].Program.ProgFD l.logger.Info("Prog Load Succeeded", "for ", direction, "progFD: ", progFD) return progInfo, progFD, nil } func (l *bpfClient) UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error { var ingressProgFD, egressProgFD int var mapToUpdate goebpfmaps.BpfMap start := time.Now() value, ok := l.policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext := value.(BPFContext) ingressProgInfo := peBPFContext.ingressPgmInfo egressProgInfo := peBPFContext.egressPgmInfo if ingressProgInfo.Program.ProgFD != 0 { ingressProgFD = ingressProgInfo.Program.ProgFD mapToUpdate = ingressProgInfo.Maps[TC_INGRESS_MAP] l.logger.Info("Pod has an Ingress hook attached. Update the corresponding map", "progFD: ", ingressProgFD, "mapName: ", TC_INGRESS_MAP) err := l.updateEbpfMap(mapToUpdate, ingressFirewallRules) duration := msSince(start) sdkAPILatency.WithLabelValues("updateEbpfMap-ingress", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Ingress Map update failed: ", "error: ", err) sdkAPIErr.WithLabelValues("updateEbpfMap-ingress").Inc() } } if egressProgInfo.Program.ProgFD != 0 { egressProgFD = egressProgInfo.Program.ProgFD mapToUpdate = egressProgInfo.Maps[TC_EGRESS_MAP] l.logger.Info("Pod has an Egress hook attached. Update the corresponding map", "progFD: ", egressProgFD, "mapName: ", TC_EGRESS_MAP) err := l.updateEbpfMap(mapToUpdate, egressFirewallRules) duration := msSince(start) sdkAPILatency.WithLabelValues("updateEbpfMap-egress", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Egress Map update failed: ", "error: ", err) sdkAPIErr.WithLabelValues("updateEbpfMap-egress").Inc() } } err := l.UpdatePodStateEbpfMaps(podIdentifier, POLICIES_APPLIED, true, true) if err != nil { l.logger.Info("Pod State Map update failed: ", "error: ", err) } } return nil } func (l *bpfClient) UpdatePodStateEbpfMaps(podIdentifier string, state int, updateIngress bool, updateEgress bool) error { var ingressProgFD, egressProgFD int var mapToUpdate goebpfmaps.BpfMap start := time.Now() value, ok := l.policyEndpointeBPFContext.Load(podIdentifier) if ok { peBPFContext := value.(BPFContext) ingressProgInfo := peBPFContext.ingressPgmInfo egressProgInfo := peBPFContext.egressPgmInfo key := uint32(POD_STATE_MAP_KEY) // pod_state_map key value := pod_state{state: uint8(state)} // pod_state_map value if updateIngress && ingressProgInfo.Program.ProgFD != 0 { ingressProgFD = ingressProgInfo.Program.ProgFD mapToUpdate = ingressProgInfo.Maps[TC_INGRESS_POD_STATE_MAP] l.logger.Info("Pod has an Ingress hook attached. Update the corresponding map", "progFD: ", ingressProgFD, "mapName: ", TC_INGRESS_POD_STATE_MAP) err := mapToUpdate.CreateUpdateMapEntry(uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&value)), 0) duration := msSince(start) sdkAPILatency.WithLabelValues("updateEbpfMap-ingress-podstate", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Ingress Pod State Map update failed: ", "error: ", err) sdkAPIErr.WithLabelValues("updateEbpfMap-ingress-podstate").Inc() } } if updateEgress && egressProgInfo.Program.ProgFD != 0 { egressProgFD = egressProgInfo.Program.ProgFD mapToUpdate = egressProgInfo.Maps[TC_EGRESS_POD_STATE_MAP] l.logger.Info("Pod has an Egress hook attached. Update the corresponding map", "progFD: ", egressProgFD, "mapName: ", TC_EGRESS_POD_STATE_MAP) err := mapToUpdate.CreateUpdateMapEntry(uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&value)), 0) duration := msSince(start) sdkAPILatency.WithLabelValues("updateEbpfMap-egress-podstate", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("Egress Map update failed: ", "error: ", err) sdkAPIErr.WithLabelValues("updateEbpfMap-egress-podstate").Inc() } } } return nil } func (l *bpfClient) IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) { ingress, egress := false, false if _, ok := l.IngressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { l.logger.Info("Pod already has Ingress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace) ingress = true } if _, ok := l.EgressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { l.logger.Info("Pod already has Egress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace) egress = true } return ingress, egress } func (l *bpfClient) IsFirstPodInPodIdentifier(podIdentifier string) bool { firstPodInPodIdentifier := false if _, ok := l.policyEndpointeBPFContext.Load(podIdentifier); !ok { l.logger.Info("No map instance found") firstPodInPodIdentifier = true } return firstPodInPodIdentifier } func (l *bpfClient) updateEbpfMap(mapToUpdate goebpfmaps.BpfMap, firewallRules []EbpfFirewallRules) error { start := time.Now() duration := msSince(start) mapEntries, err := l.computeMapEntriesFromEndpointRules(firewallRules) if err != nil { l.logger.Info("Trie entry creation/validation failed ", "error: ", err) return err } l.logger.Info("ID of map to update: ", "ID: ", mapToUpdate.MapID) err = mapToUpdate.BulkRefreshMapEntries(mapEntries) sdkAPILatency.WithLabelValues("BulkRefreshMapEntries", fmt.Sprint(err != nil)).Observe(duration) if err != nil { l.logger.Info("BPF map update failed", "error: ", err) sdkAPIErr.WithLabelValues("BulkRefreshMapEntries").Inc() return err } return nil } func sortFirewallRulesByPrefixLength(rules []EbpfFirewallRules, prefixLenStr string) { sort.Slice(rules, func(i, j int) bool { prefixSplit := strings.Split(prefixLenStr, "/") prefixLen, _ := strconv.Atoi(prefixSplit[1]) prefixLenIp1 := prefixLen prefixLenIp2 := prefixLen if strings.Contains(string(rules[i].IPCidr), "/") { prefixIp1 := strings.Split(string(rules[i].IPCidr), "/") prefixLenIp1, _ = strconv.Atoi(prefixIp1[1]) } if strings.Contains(string(rules[j].IPCidr), "/") { prefixIp2 := strings.Split(string(rules[j].IPCidr), "/") prefixLenIp2, _ = strconv.Atoi(prefixIp2[1]) } return prefixLenIp1 < prefixLenIp2 }) } func mergeDuplicateL4Info(ports []v1alpha1.Port) []v1alpha1.Port { uniquePorts := make(map[string]v1alpha1.Port) var result []v1alpha1.Port var key string for _, p := range ports { portKey := 0 endPortKey := 0 if p.Port != nil { portKey = int(*p.Port) } if p.EndPort != nil { endPortKey = int(*p.EndPort) } if p.Protocol == nil { key = fmt.Sprintf("%s-%d-%d", "", portKey, endPortKey) } else { key = fmt.Sprintf("%s-%d-%d", *p.Protocol, portKey, endPortKey) } if _, ok := uniquePorts[key]; ok { continue } else { uniquePorts[key] = p } } for _, port := range uniquePorts { result = append(result, port) } return result } func (l *bpfClient) computeMapEntriesFromEndpointRules(firewallRules []EbpfFirewallRules) (map[string][]byte, error) { firewallMap := make(map[string][]byte) ipCIDRs := make(map[string][]v1alpha1.Port) nonHostCIDRs := make(map[string][]v1alpha1.Port) isCatchAllIPEntryPresent, allowAll := false, false var catchAllIPPorts []v1alpha1.Port //Traffic from the local node should always be allowed. Add NodeIP by default to map entries. _, mapKey, _ := net.ParseCIDR(l.nodeIP + l.hostMask) key := utils.ComputeTrieKey(*mapKey, l.enableIPv6) value := utils.ComputeTrieValue([]v1alpha1.Port{}, l.logger, true, false) firewallMap[string(key)] = value //Sort the rules sortFirewallRulesByPrefixLength(firewallRules, l.hostMask) //Check and aggregate L4 Port Info for Catch All Entries. catchAllIPPorts, isCatchAllIPEntryPresent, allowAll = l.checkAndDeriveCatchAllIPPorts(firewallRules) if isCatchAllIPEntryPresent { //Add the Catch All IP entry _, mapKey, _ := net.ParseCIDR("0.0.0.0/0") key := utils.ComputeTrieKey(*mapKey, l.enableIPv6) value := utils.ComputeTrieValue(catchAllIPPorts, l.logger, allowAll, false) firewallMap[string(key)] = value } for _, firewallRule := range firewallRules { var cidrL4Info []v1alpha1.Port if !strings.Contains(string(firewallRule.IPCidr), "/") { firewallRule.IPCidr += v1alpha1.NetworkAddress(l.hostMask) } if utils.IsNodeIP(l.nodeIP, string(firewallRule.IPCidr)) { continue } if l.enableIPv6 && !strings.Contains(string(firewallRule.IPCidr), "::") { l.logger.Info("Skipping ipv4 rule in ipv6 cluster: ", "CIDR: ", string(firewallRule.IPCidr)) continue } if !l.enableIPv6 && strings.Contains(string(firewallRule.IPCidr), "::") { l.logger.Info("Skipping ipv6 rule in ipv4 cluster: ", "CIDR: ", string(firewallRule.IPCidr)) continue } if !utils.IsCatchAllIPEntry(string(firewallRule.IPCidr)) { if len(firewallRule.L4Info) == 0 { l.logger.Info("No L4 specified. Add Catch all entry: ", "CIDR: ", firewallRule.IPCidr) l.addCatchAllL4Entry(&firewallRule) l.logger.Info("Total L4 entries ", "count: ", len(firewallRule.L4Info)) } if utils.IsNonHostCIDR(string(firewallRule.IPCidr)) { existingL4Info, ok := nonHostCIDRs[string(firewallRule.IPCidr)] if ok { firewallRule.L4Info = append(firewallRule.L4Info, existingL4Info...) } else { // Check if the /m entry is part of any /n CIDRs that we've encountered so far // If found, we need to include the port and protocol combination against the current entry as well since // we use LPM TRIE map and the /m will always win out. cidrL4Info = l.checkAndDeriveL4InfoFromAnyMatchingCIDRs(string(firewallRule.IPCidr), nonHostCIDRs) if len(cidrL4Info) > 0 { firewallRule.L4Info = append(firewallRule.L4Info, cidrL4Info...) } } nonHostCIDRs[string(firewallRule.IPCidr)] = firewallRule.L4Info } else { if existingL4Info, ok := ipCIDRs[string(firewallRule.IPCidr)]; ok { firewallRule.L4Info = append(firewallRule.L4Info, existingL4Info...) } // Check if the /32 entry is part of any non host CIDRs that we've encountered so far // If found, we need to include the port and protocol combination against the current entry as well since // we use LPM TRIE map and the /32 will always win out. cidrL4Info = l.checkAndDeriveL4InfoFromAnyMatchingCIDRs(string(firewallRule.IPCidr), nonHostCIDRs) if len(cidrL4Info) > 0 { firewallRule.L4Info = append(firewallRule.L4Info, cidrL4Info...) } ipCIDRs[string(firewallRule.IPCidr)] = firewallRule.L4Info } //Include port and protocol combination paired with catch all entries firewallRule.L4Info = append(firewallRule.L4Info, catchAllIPPorts...) l.logger.Info("Updating Map with ", "IP Key:", firewallRule.IPCidr) _, firewallMapKey, _ := net.ParseCIDR(string(firewallRule.IPCidr)) // Key format: Prefix length (4 bytes) followed by 4/16byte IP address firewallKey := utils.ComputeTrieKey(*firewallMapKey, l.enableIPv6) if len(firewallRule.L4Info) != 0 { mergedL4Info := mergeDuplicateL4Info(firewallRule.L4Info) firewallRule.L4Info = mergedL4Info } firewallValue := utils.ComputeTrieValue(firewallRule.L4Info, l.logger, allowAll, false) firewallMap[string(firewallKey)] = firewallValue } if firewallRule.Except != nil { for _, exceptCIDR := range firewallRule.Except { _, mapKey, _ := net.ParseCIDR(string(exceptCIDR)) key := utils.ComputeTrieKey(*mapKey, l.enableIPv6) l.logger.Info("Parsed Except CIDR", "IP Key: ", mapKey) if len(firewallRule.L4Info) != 0 { mergedL4Info := mergeDuplicateL4Info(firewallRule.L4Info) firewallRule.L4Info = mergedL4Info } value := utils.ComputeTrieValue(firewallRule.L4Info, l.logger, false, true) firewallMap[string(key)] = value } } } return firewallMap, nil } func (l *bpfClient) checkAndDeriveCatchAllIPPorts(firewallRules []EbpfFirewallRules) ([]v1alpha1.Port, bool, bool) { var catchAllL4Info []v1alpha1.Port isCatchAllIPEntryPresent := false allowAllPortAndProtocols := false for _, firewallRule := range firewallRules { if !strings.Contains(string(firewallRule.IPCidr), "/") { firewallRule.IPCidr += v1alpha1.NetworkAddress(l.hostMask) } if !l.enableIPv6 && strings.Contains(string(firewallRule.IPCidr), "::") { l.logger.Info("IPv6 catch all entry in IPv4 mode - skip ") continue } if utils.IsCatchAllIPEntry(string(firewallRule.IPCidr)) { catchAllL4Info = append(catchAllL4Info, firewallRule.L4Info...) isCatchAllIPEntryPresent = true if len(firewallRule.L4Info) == 0 { //All ports and protocols allowAllPortAndProtocols = true } } l.logger.Info("Current L4 entry count for catch all entry: ", "count: ", len(catchAllL4Info)) } l.logger.Info("Total L4 entry count for catch all entry: ", "count: ", len(catchAllL4Info)) return catchAllL4Info, isCatchAllIPEntryPresent, allowAllPortAndProtocols } func (l *bpfClient) checkAndDeriveL4InfoFromAnyMatchingCIDRs(firewallRule string, nonHostCIDRs map[string][]v1alpha1.Port) []v1alpha1.Port { var matchingCIDRL4Info []v1alpha1.Port _, ipToCheck, _ := net.ParseCIDR(firewallRule) for nonHostCIDR, l4Info := range nonHostCIDRs { _, cidrEntry, _ := net.ParseCIDR(nonHostCIDR) l.logger.Info("CIDR match: ", "for IP: ", firewallRule, "in CIDR: ", nonHostCIDR) if cidrEntry.Contains(ipToCheck.IP) { l.logger.Info("Found a CIDR match: ", "for IP: ", firewallRule, "in CIDR: ", nonHostCIDR) matchingCIDRL4Info = append(matchingCIDRL4Info, l4Info...) } } return matchingCIDRL4Info } func (l *bpfClient) addCatchAllL4Entry(firewallRule *EbpfFirewallRules) { catchAllL4Entry := v1alpha1.Port{ Protocol: &CATCH_ALL_PROTOCOL, } firewallRule.L4Info = append(firewallRule.L4Info, catchAllL4Entry) } func (l *bpfClient) DeletePodFromIngressProgPodCaches(podName string, podNamespace string) { podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace) if progFD, ok := l.IngressPodToProgMap.Load(podNamespacedName); ok { l.IngressPodToProgMap.Delete(podNamespacedName) if currentSet, ok := l.IngressProgToPodsMap.Load(progFD); ok { set := currentSet.(map[string]struct{}) delete(set, podNamespacedName) if len(set) == 0 { l.IngressProgToPodsMap.Delete(progFD) } } } } func (l *bpfClient) DeletePodFromEgressProgPodCaches(podName string, podNamespace string) { podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace) if progFD, ok := l.EgressPodToProgMap.Load(podNamespacedName); ok { l.EgressPodToProgMap.Delete(podNamespacedName) if currentSet, ok := l.EgressProgToPodsMap.Load(progFD); ok { set := currentSet.(map[string]struct{}) delete(set, podNamespacedName) if len(set) == 0 { l.EgressProgToPodsMap.Delete(progFD) } } } }