controllers/policyendpoints_controller.go (619 lines of code) (raw):
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
policyk8sawsv1 "github.com/aws/aws-network-policy-agent/api/v1alpha1"
"github.com/aws/aws-network-policy-agent/pkg/ebpf"
"github.com/aws/aws-network-policy-agent/pkg/utils"
"github.com/aws/aws-network-policy-agent/pkg/utils/imds"
"github.com/prometheus/client_golang/prometheus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/go-logr/logr"
networking "k8s.io/api/networking/v1"
)
const (
defaultLocalConntrackCacheCleanupPeriodInSeconds = 300
)
var (
policySetupLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "awsnodeagent_policy_setup_latency_ms",
Help: "policy configuration setup call latency in ms",
},
[]string{"name", "namespace"},
)
policyTearDownLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "awsnodeagent_policy_teardown_latency_ms",
Help: "policy configuration teardown call latency in ms",
},
[]string{"name", "namespace"},
)
prometheusRegistered = false
)
const (
POLICIES_APPLIED = 0
DEFAULT_ALLOW = 1
DEFAULT_DENY = 2
)
func msSince(start time.Time) float64 {
return float64(time.Since(start) / time.Millisecond)
}
func prometheusRegister() {
if !prometheusRegistered {
prometheus.MustRegister(policySetupLatency)
prometheus.MustRegister(policyTearDownLatency)
prometheusRegistered = true
}
}
// NewPolicyEndpointsReconciler constructs new PolicyEndpointReconciler
func NewPolicyEndpointsReconciler(k8sClient client.Client, log logr.Logger,
enablePolicyEventLogs, enableCloudWatchLogs bool, enableIPv6 bool, enableNetworkPolicy bool, conntrackTTL int, conntrackTableSize int) (*PolicyEndpointsReconciler, error) {
r := &PolicyEndpointsReconciler{
k8sClient: k8sClient,
log: log,
}
if !enableIPv6 {
r.nodeIP, _ = imds.GetMetaData("local-ipv4")
} else {
r.nodeIP, _ = imds.GetMetaData("ipv6")
}
r.log.Info("ConntrackTTL", "cleanupPeriod", conntrackTTL)
var err error
r.enableNetworkPolicy = enableNetworkPolicy
// keep the check here for UT TestIsProgFdShared
if enableNetworkPolicy {
r.ebpfClient, err = ebpf.NewBpfClient(&r.policyEndpointeBPFContext, r.nodeIP,
enablePolicyEventLogs, enableCloudWatchLogs, enableIPv6, conntrackTTL, conntrackTableSize)
r.ebpfClient.ReAttachEbpfProbes()
// Start prometheus
prometheusRegister()
}
return r, err
}
// PolicyEndpointsReconciler reconciles a PolicyEndpoints object
type PolicyEndpointsReconciler struct {
k8sClient client.Client
scheme *runtime.Scheme
//Primary IP of EC2 instance
nodeIP string
// Maps PolicyEndpoint resource to it's eBPF context
policyEndpointeBPFContext sync.Map
// Maps pod Identifier to list of PolicyEndpoint resources
podIdentifierToPolicyEndpointMap sync.Map
// Mutex for operations on PodIdentifierToPolicyEndpointMap
podIdentifierToPolicyEndpointMapMutex sync.Mutex
// Maps PolicyEndpoint resource with a list of local pods
policyEndpointSelectorMap sync.Map
// Maps a Network Policy to list of selected pod Identifiers
networkPolicyToPodIdentifierMap sync.Map
//BPF Client instance
ebpfClient ebpf.BpfClient
// NetworkPolicy enabled/disabled
enableNetworkPolicy bool
// NetworkPolicy mode standard/strict
networkPolicyMode string
//Logger
log logr.Logger
}
//+kubebuilder:rbac:groups=networking.k8s.aws,resources=policyendpoints,verbs=get;list;watch
//+kubebuilder:rbac:groups=networking.k8s.aws,resources=policyendpoints/status,verbs=get
func (r *PolicyEndpointsReconciler) SetNetworkPolicyMode(mode string) {
r.networkPolicyMode = mode
}
func (r *PolicyEndpointsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Info("Received a new reconcile request", "req", req)
if err := r.reconcile(ctx, req); err != nil {
r.log.Error(err, "Reconcile error")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *PolicyEndpointsReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
policyEndpoint := &policyk8sawsv1.PolicyEndpoint{}
if err := r.k8sClient.Get(ctx, req.NamespacedName, policyEndpoint); err != nil {
if apierrors.IsNotFound(err) {
return r.cleanUpPolicyEndpoint(ctx, req)
}
r.log.Error(err, "Unable to get policy endpoint spec", "policyendpoint", req.NamespacedName)
return err
}
if !policyEndpoint.DeletionTimestamp.IsZero() {
return r.cleanUpPolicyEndpoint(ctx, req)
}
return r.reconcilePolicyEndpoint(ctx, policyEndpoint)
}
func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, req ctrl.Request) error {
r.log.Info("Clean Up PolicyEndpoint resources for", "name:", req.NamespacedName.Name)
policyEndpointIdentifier := utils.GetPolicyEndpointIdentifier(req.NamespacedName.Name,
req.NamespacedName.Namespace)
start := time.Now()
// Get all podIdentifiers since we need to decide if pinpath has to be deleted on local node
parentNP := utils.GetParentNPNameFromPEName(req.NamespacedName.Name)
resourceName := req.NamespacedName.Name
resourceNamespace := req.NamespacedName.Namespace
targetPods, podIdentifiers, podsToBeCleanedUp := r.deriveTargetPodsForParentNP(ctx, parentNP, resourceNamespace, resourceName)
r.policyEndpointSelectorMap.Delete(policyEndpointIdentifier)
r.log.Info("cleanUpPolicyEndpoint: ", "Pods to cleanup - ", len(podsToBeCleanedUp), "and Pods to be updated - ", len(targetPods))
// targetPods are pods which would need map update
if len(targetPods) > 0 {
r.log.Info("Updating active pods...")
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, targetPods, podIdentifiers, false)
if err != nil {
r.log.Info("failed to update bpf probes for ", "policy endpoint ", req.NamespacedName.Name)
return err
}
duration := msSince(start)
policyTearDownLatency.WithLabelValues(req.NamespacedName.Name, req.NamespacedName.Namespace).Observe(duration)
}
// podsToBeCleanedUp - pods which are no longer selected by this policy
if len(podsToBeCleanedUp) > 0 {
r.log.Info("Cleaning up current policy against below pods..")
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, podsToBeCleanedUp, podIdentifiers, true)
if err != nil {
r.log.Info("failed to clean up bpf probes for ", "policy endpoint ", req.NamespacedName.Name)
return err
}
duration := msSince(start)
policyTearDownLatency.WithLabelValues(req.NamespacedName.Name, req.NamespacedName.Namespace).Observe(duration)
}
for _, podToBeCleanedUp := range podsToBeCleanedUp {
podIdentifier := utils.GetPodIdentifier(podToBeCleanedUp.Name, podToBeCleanedUp.Namespace, r.log)
//Delete this policyendpoint resource against the current PodIdentifier
r.deletePolicyEndpointFromPodIdentifierMap(ctx, podIdentifier, req.NamespacedName.Name)
}
return nil
}
func (r *PolicyEndpointsReconciler) IsProgFdShared(targetPodName string,
targetPodNamespace string) (bool, error) {
targetpodNamespacedName := utils.GetPodNamespacedName(targetPodName, targetPodNamespace)
// check ingress caches
if targetProgFD, ok := r.ebpfClient.GetIngressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetIngressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok {
if len(podsList) > 1 {
r.log.Info("isProgFdShared", "Found shared ingress progFD for target: ", targetPodName, "progFD: ", targetProgFD)
return true, nil
}
return false, nil // Not shared (only one pod)
}
}
}
// Check Egress Maps if not found in Ingress
if targetProgFD, ok := r.ebpfClient.GetEgressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetEgressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok {
if len(podsList) > 1 {
r.log.Info("IsProgFdShared", "Found shared egress progFD for target:", targetPodName, "progFD:", targetProgFD)
return true, nil
}
return false, nil // Not shared (only one pod)
}
}
}
// If not found in both maps, return an error
r.log.Info("IsProgFdShared", "Pod not found in either IngressPodToProgMap or EgressPodToProgMap:", targetpodNamespacedName)
return false, fmt.Errorf("pod not found in either IngressPodToProgMap or EgressPodToProgMap: %s", targetpodNamespacedName)
}
func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string,
targetPods []types.NamespacedName, podIdentifiers map[string]bool, isDeleteFlow bool) error {
var err error
// 1. If the pods are already deleted, we move on.
// 2. If the pods have another policy or policies active against them, we update the maps to purge the entries
// introduced by the current policy.
// 3. If there are no more active policies against this pod, we update pod_state to default deny/allow
for _, targetPod := range targetPods {
r.log.Info("Updating Pod: ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace, r.log)
r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier)
cleanupErr := r.cleanupPod(ctx, targetPod, policyEndpointName, isDeleteFlow)
if cleanupErr != nil {
r.log.Info("Cleanup/Update unsuccessful for Pod ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
err = errors.Join(err, cleanupErr)
// we don't want to return an error right away but instead attempt to clean up all the pods
// in the list before returning
}
}
return err
}
func (r *PolicyEndpointsReconciler) reconcilePolicyEndpoint(ctx context.Context,
policyEndpoint *policyk8sawsv1.PolicyEndpoint) error {
r.log.Info("Processing Policy Endpoint ", "Name: ", policyEndpoint.Name, "Namespace ", policyEndpoint.Namespace)
start := time.Now()
// Identify pods local to the node. PolicyEndpoint resource will include `HostIP` field and
// network policy agent relies on it to filter local pods
parentNP := policyEndpoint.Spec.PolicyRef.Name
resourceNamespace := policyEndpoint.Namespace
resourceName := policyEndpoint.Name
targetPods, podIdentifiers, podsToBeCleanedUp := r.deriveTargetPodsForParentNP(ctx, parentNP, resourceNamespace, resourceName)
// Check if we need to remove this policy against any existing pods against which this policy
// is currently active. podIdentifiers will have the pod identifiers of the targetPods from the derived PEs
err := r.updatePolicyEnforcementStatusForPods(ctx, policyEndpoint.Name, podsToBeCleanedUp, podIdentifiers, false)
if err != nil {
r.log.Error(err, "failed to update policy enforcement status for existing pods")
return err
}
for podIdentifier, _ := range podIdentifiers {
// Derive Ingress IPs from the PolicyEndpoint
ingressRules, egressRules, isIngressIsolated, isEgressIsolated, err := r.deriveIngressAndEgressFirewallRules(ctx, podIdentifier,
policyEndpoint.Namespace, policyEndpoint.Name, false)
if err != nil {
r.log.Error(err, "Error Parsing policy Endpoint resource", "name:", policyEndpoint.Name)
return err
}
if len(ingressRules) == 0 && !isIngressIsolated {
//Add allow-all entry to Ingress rule set
r.log.Info("No Ingress rules and no ingress isolation - Appending catch all entry")
r.addCatchAllEntry(ctx, &ingressRules)
}
if len(egressRules) == 0 && !isEgressIsolated {
//Add allow-all entry to Egress rule set
r.log.Info("No Egress rules and no egress isolation - Appending catch all entry")
r.addCatchAllEntry(ctx, &egressRules)
}
// Setup/configure eBPF probes/maps for local pods
err = r.configureeBPFProbes(ctx, podIdentifier, targetPods, ingressRules, egressRules)
if err != nil {
r.log.Info("Error configuring eBPF Probes ", "error: ", err)
}
duration := msSince(start)
policySetupLatency.WithLabelValues(policyEndpoint.Name, policyEndpoint.Namespace).Observe(duration)
}
return nil
}
func (r *PolicyEndpointsReconciler) configureeBPFProbes(ctx context.Context, podIdentifier string,
targetPods []types.NamespacedName, ingressRules, egressRules []ebpf.EbpfFirewallRules) error {
var err error
//Loop over target pods and setup/configure/update eBPF probes/maps
for _, pod := range targetPods {
r.log.Info("Processing Pod: ", "name:", pod.Name, "namespace:", pod.Namespace, "podIdentifier: ", podIdentifier)
currentPodIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace, r.log)
if currentPodIdentifier != podIdentifier {
r.log.Info("Target Pod doesn't belong to the current pod Identifier: ", "Name: ", pod.Name, "Pod ID: ", podIdentifier)
continue
}
err = r.ebpfClient.AttacheBPFProbes(pod, podIdentifier)
if err != nil {
r.log.Info("Attaching eBPF probe failed for", "pod", pod.Name, "namespace", pod.Namespace)
return err
}
r.log.Info("Successfully attached required eBPF probes for", "pod:", pod.Name, "in namespace", pod.Namespace)
}
err = r.updateeBPFMaps(ctx, podIdentifier, ingressRules, egressRules)
if err != nil {
r.log.Error(err, "failed to update map ", "podIdentifier ", podIdentifier)
return err
}
return nil
}
func (r *PolicyEndpointsReconciler) cleanupPod(ctx context.Context, targetPod types.NamespacedName,
policyEndpoint string, isDeleteFlow bool) error {
var err error
var ingressRules, egressRules []ebpf.EbpfFirewallRules
var isIngressIsolated, isEgressIsolated bool
noActiveIngressPolicies, noActiveEgressPolicies := false, false
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace, r.log)
// Detach eBPF probes attached to the local pods (if required). We should detach eBPF probes if this
// is the only PolicyEndpoint resource that applies to this pod. If not, just update the Ingress/Egress Map contents
if _, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
ingressRules, egressRules, isIngressIsolated, isEgressIsolated, err = r.deriveIngressAndEgressFirewallRules(ctx, podIdentifier, targetPod.Namespace,
policyEndpoint, isDeleteFlow)
if err != nil {
r.log.Error(err, "Error Parsing policy Endpoint resource", "name ", policyEndpoint)
return err
}
if len(ingressRules) == 0 && !isIngressIsolated {
noActiveIngressPolicies = true
}
if len(egressRules) == 0 && !isEgressIsolated {
noActiveEgressPolicies = true
}
// We update pod_state to default allow/deny if there are no other policies applied
if noActiveIngressPolicies && noActiveEgressPolicies {
state := DEFAULT_ALLOW
if utils.IsStrictMode(r.networkPolicyMode) {
state = DEFAULT_DENY
}
r.log.Info("No active policies. Updating pod_state map for ", "podIdentifier: ", podIdentifier, "networkPolicyMode: ", r.networkPolicyMode)
err = r.GeteBPFClient().UpdatePodStateEbpfMaps(podIdentifier, state, true, true)
if err != nil {
r.log.Error(err, "Map update(s) failed for, ", "podIdentifier ", podIdentifier)
return err
}
} else {
// We've additional PolicyEndpoint resources configured against this pod
// Update the Maps and move on
r.log.Info("Active policies against this pod. Skip Detaching probes and Update Maps... ")
if noActiveIngressPolicies {
// No active ingress rules for this pod, but we only should land here
// if there are active egress rules. So, we need to add an allow-all entry to ingress rule set
r.log.Info("No Ingress rules and no ingress isolation - Appending catch all entry")
r.addCatchAllEntry(ctx, &ingressRules)
}
if noActiveEgressPolicies {
// No active egress rules for this pod but we only should land here
// if there are active ingress rules. So, we need to add an allow-all entry to egress rule set
r.log.Info("No Egress rules and no egress isolation - Appending catch all entry")
r.addCatchAllEntry(ctx, &egressRules)
}
err = r.updateeBPFMaps(ctx, podIdentifier, ingressRules, egressRules)
if err != nil {
r.log.Info("Map Update failed for ", "policyEndpoint: ")
return err
}
}
}
return nil
}
func (r *PolicyEndpointsReconciler) deriveIngressAndEgressFirewallRules(ctx context.Context,
podIdentifier string, resourceNamespace string, resourceName string, isDeleteFlow bool) ([]ebpf.EbpfFirewallRules, []ebpf.EbpfFirewallRules, bool, bool, error) {
var ingressRules, egressRules []ebpf.EbpfFirewallRules
isIngressIsolated, isEgressIsolated := false, false
currentPE := &policyk8sawsv1.PolicyEndpoint{}
if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
r.log.Info("Total number of PolicyEndpoint resources for", "podIdentifier ", podIdentifier, " are ", len(policyEndpointList.([]string)))
for _, policyEndpointResource := range policyEndpointList.([]string) {
peNamespacedName := types.NamespacedName{
Name: policyEndpointResource,
Namespace: resourceNamespace,
}
if isDeleteFlow {
deletedPEParentNPName := utils.GetParentNPNameFromPEName(resourceName)
currentPEParentNPName := utils.GetParentNPNameFromPEName(policyEndpointResource)
if deletedPEParentNPName == currentPEParentNPName {
r.log.Info("PE belongs to same NP. Ignore and move on since it's a delete flow",
"deletedPE", resourceName, "currentPE", policyEndpointResource)
continue
}
}
if err := r.k8sClient.Get(ctx, peNamespacedName, currentPE); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return nil, nil, isIngressIsolated, isEgressIsolated, err
}
r.log.Info("Deriving Firewall rules for PolicyEndpoint:", "Name: ", currentPE.Name)
for _, endPointInfo := range currentPE.Spec.Ingress {
ingressRules = append(ingressRules,
ebpf.EbpfFirewallRules{
IPCidr: endPointInfo.CIDR,
Except: endPointInfo.Except,
L4Info: endPointInfo.Ports,
})
}
for _, endPointInfo := range currentPE.Spec.Egress {
egressRules = append(egressRules,
ebpf.EbpfFirewallRules{
IPCidr: endPointInfo.CIDR,
Except: endPointInfo.Except,
L4Info: endPointInfo.Ports,
})
}
r.log.Info("Total no.of - ", "ingressRules", len(ingressRules), "egressRules", len(egressRules))
ingressIsolated, egressIsolated := r.deriveDefaultPodIsolation(ctx, currentPE, len(ingressRules), len(egressRules))
isIngressIsolated = isIngressIsolated || ingressIsolated
isEgressIsolated = isEgressIsolated || egressIsolated
}
}
if len(ingressRules) > 0 {
isIngressIsolated = false
}
if len(egressRules) > 0 {
isEgressIsolated = false
}
return ingressRules, egressRules, isIngressIsolated, isEgressIsolated, nil
}
func (r *PolicyEndpointsReconciler) deriveDefaultPodIsolation(ctx context.Context, policyEndpoint *policyk8sawsv1.PolicyEndpoint,
ingressRulesCount, egressRulesCount int) (bool, bool) {
isIngressIsolated, isEgressIsolated := false, false
for _, value := range policyEndpoint.Spec.PodIsolation {
if value == networking.PolicyTypeIngress && ingressRulesCount == 0 {
r.log.Info("Default Deny enabled on Ingress")
isIngressIsolated = true
}
if value == networking.PolicyTypeEgress && egressRulesCount == 0 {
r.log.Info("Default Deny enabled on Egress")
isEgressIsolated = true
}
}
return isIngressIsolated, isEgressIsolated
}
func (r *PolicyEndpointsReconciler) updateeBPFMaps(ctx context.Context, podIdentifier string,
ingressRules, egressRules []ebpf.EbpfFirewallRules) error {
// Map Update should only happen once for those that share the same Map
err := r.ebpfClient.UpdateEbpfMaps(podIdentifier, ingressRules, egressRules)
if err != nil {
r.log.Error(err, "Map update(s) failed for, ", "podIdentifier ", podIdentifier)
return err
}
return nil
}
func (r *PolicyEndpointsReconciler) deriveTargetPodsForParentNP(ctx context.Context,
parentNP, resourceNamespace, resourceName string) ([]types.NamespacedName, map[string]bool, []types.NamespacedName) {
var targetPods, podsToBeCleanedUp, currentPods []types.NamespacedName
var targetPodIdentifiers []string
podIdentifiers := make(map[string]bool)
currentPE := &policyk8sawsv1.PolicyEndpoint{}
r.log.Info("Parent NP resource:", "Name: ", parentNP)
parentPEList := r.derivePolicyEndpointsOfParentNP(ctx, parentNP, resourceNamespace)
r.log.Info("Total PEs for Parent NP:", "Count: ", len(parentPEList))
policyEndpointIdentifier := utils.GetPolicyEndpointIdentifier(resourceName,
resourceNamespace)
// Gather the current set of pods (local to the node) that are configured with this policy rules.
existingPods, podsPresent := r.policyEndpointSelectorMap.Load(policyEndpointIdentifier)
if podsPresent {
existingPodsSlice := existingPods.([]types.NamespacedName)
for _, pods := range existingPodsSlice {
currentPods = append(currentPods, pods)
r.log.Info("Current pods for this slice : ", "Pod name", pods.Name, "Pod namespace", pods.Namespace)
}
}
if len(parentPEList) == 0 {
podsToBeCleanedUp = append(podsToBeCleanedUp, currentPods...)
r.policyEndpointSelectorMap.Delete(policyEndpointIdentifier)
r.log.Info("No PEs left: ", "number of pods to cleanup - ", len(podsToBeCleanedUp))
}
for _, policyEndpointResource := range parentPEList {
r.log.Info("Derive PE Object ", "Name ", policyEndpointResource)
peNamespacedName := types.NamespacedName{
Name: policyEndpointResource,
Namespace: resourceNamespace,
}
if err := r.k8sClient.Get(ctx, peNamespacedName, currentPE); err != nil {
if apierrors.IsNotFound(err) {
continue
}
}
r.log.Info("Processing PE ", "Name ", policyEndpointResource)
currentTargetPods, currentPodIdentifiers := r.deriveTargetPods(ctx, currentPE, parentPEList)
r.log.Info("Adding to current targetPods", "Total pods: ", len(currentTargetPods))
targetPods = append(targetPods, currentTargetPods...)
for podIdentifier, _ := range currentPodIdentifiers {
podIdentifiers[podIdentifier] = true
targetPodIdentifiers = append(targetPodIdentifiers, podIdentifier)
}
}
//Update active podIdentifiers selected by the current Network Policy
stalePodIdentifiers := r.deriveStalePodIdentifiers(ctx, resourceName, targetPodIdentifiers)
for _, policyEndpointResource := range parentPEList {
policyEndpointIdentifier := utils.GetPolicyEndpointIdentifier(policyEndpointResource,
resourceNamespace)
if len(targetPods) > 0 {
r.log.Info("Update target pods for PE Object ", "Name ", policyEndpointResource, " with Total pods: ", len(targetPods))
r.policyEndpointSelectorMap.Store(policyEndpointIdentifier, targetPods)
} else {
r.log.Info("No more target pods so deleting the entry in PE selector map for ", "Name ", policyEndpointResource)
r.policyEndpointSelectorMap.Delete(policyEndpointIdentifier)
}
for _, podIdentifier := range stalePodIdentifiers {
r.deletePolicyEndpointFromPodIdentifierMap(ctx, podIdentifier, policyEndpointResource)
}
}
//Update active podIdentifiers selected by the current Network Policy
r.networkPolicyToPodIdentifierMap.Store(utils.GetParentNPNameFromPEName(resourceName), targetPodIdentifiers)
if len(currentPods) > 0 {
podsToBeCleanedUp = r.getPodListToBeCleanedUp(currentPods, targetPods, podIdentifiers)
}
return targetPods, podIdentifiers, podsToBeCleanedUp
}
// Derives list of local pods the policy endpoint resource selects.
// Function returns list of target pods along with their unique identifiers. It also
// captures list of (any) existing pods against which this policy is no longer active.
func (r *PolicyEndpointsReconciler) deriveTargetPods(ctx context.Context,
policyEndpoint *policyk8sawsv1.PolicyEndpoint, parentPEList []string) ([]types.NamespacedName, map[string]bool) {
var targetPods []types.NamespacedName
podIdentifiers := make(map[string]bool)
// Pods are grouped by Host IP. Individual node agents will filter (local) pods
// by the Host IP value.
nodeIP := net.ParseIP(r.nodeIP)
for _, pod := range policyEndpoint.Spec.PodSelectorEndpoints {
podIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace, r.log)
if nodeIP.Equal(net.ParseIP(string(pod.HostIP))) {
r.log.Info("Found a matching Pod: ", "name: ", pod.Name, "namespace: ", pod.Namespace)
targetPods = append(targetPods, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})
podIdentifiers[podIdentifier] = true
r.log.Info("Derived ", "Pod identifier: ", podIdentifier)
}
r.updatePodIdentifierToPEMap(ctx, podIdentifier, parentPEList)
}
return targetPods, podIdentifiers
}
func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.NamespacedName,
newPodSet []types.NamespacedName, podIdentifiers map[string]bool) []types.NamespacedName {
var podsToBeCleanedUp []types.NamespacedName
for _, oldPod := range oldPodSet {
activePod := false
oldPodIdentifier := utils.GetPodIdentifier(oldPod.Name, oldPod.Namespace, r.log)
for _, newPod := range newPodSet {
if oldPod == newPod {
activePod = true
break
}
}
// We want to clean up the pod when pod is still running but pod is not an active pod against policy endpoint
// This implies policy endpoint is no longer applied to the podIdentifier
if !activePod && !podIdentifiers[oldPodIdentifier] {
r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace)
podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod)
}
}
return podsToBeCleanedUp
}
func (r *PolicyEndpointsReconciler) updatePodIdentifierToPEMap(ctx context.Context, podIdentifier string,
parentPEList []string) {
r.podIdentifierToPolicyEndpointMapMutex.Lock()
defer r.podIdentifierToPolicyEndpointMapMutex.Unlock()
var policyEndpoints []string
r.log.Info("Current PE Count for Parent NP:", "Count: ", len(parentPEList))
if currentPESet, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
policyEndpoints = currentPESet.([]string)
for _, policyEndpointResourceName := range parentPEList {
r.log.Info("PE for parent NP", "name", policyEndpointResourceName)
addPEResource := true
for _, pe := range currentPESet.([]string) {
if pe == policyEndpointResourceName {
//Nothing to do if this PE is already tracked against this podIdentifier
addPEResource = false
break
}
}
if addPEResource {
r.log.Info("Adding PE", "name", policyEndpointResourceName, "for podIdentifier", podIdentifier)
policyEndpoints = append(policyEndpoints, policyEndpointResourceName)
}
}
} else {
policyEndpoints = append(policyEndpoints, parentPEList...)
}
r.podIdentifierToPolicyEndpointMap.Store(podIdentifier, policyEndpoints)
return
}
func (r *PolicyEndpointsReconciler) deriveStalePodIdentifiers(ctx context.Context, resourceName string,
targetPodIdentifiers []string) []string {
var stalePodIdentifiers []string
if currentPodIdentifiers, ok := r.networkPolicyToPodIdentifierMap.Load(utils.GetParentNPNameFromPEName(resourceName)); ok {
for _, podIdentifier := range currentPodIdentifiers.([]string) {
r.log.Info("podIdentifier", "name", podIdentifier)
stalePodIdentifier := true
for _, pe := range targetPodIdentifiers {
if pe == podIdentifier {
//Nothing to do if this PE is already tracked against this podIdentifier
stalePodIdentifier = false
break
}
}
if stalePodIdentifier {
stalePodIdentifiers = append(stalePodIdentifiers, podIdentifier)
}
}
}
return stalePodIdentifiers
}
func (r *PolicyEndpointsReconciler) deletePolicyEndpointFromPodIdentifierMap(ctx context.Context, podIdentifier string,
policyEndpoint string) {
r.podIdentifierToPolicyEndpointMapMutex.Lock()
defer r.podIdentifierToPolicyEndpointMapMutex.Unlock()
var currentPEList []string
if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
for _, policyEndpointName := range policyEndpointList.([]string) {
if policyEndpointName == policyEndpoint {
continue
}
currentPEList = append(currentPEList, policyEndpointName)
}
r.podIdentifierToPolicyEndpointMap.Store(podIdentifier, currentPEList)
}
}
func (r *PolicyEndpointsReconciler) addCatchAllEntry(ctx context.Context, firewallRules *[]ebpf.EbpfFirewallRules) {
//Add allow-all entry to firewall rule set
catchAllRule := policyk8sawsv1.EndpointInfo{
CIDR: "0.0.0.0/0",
}
*firewallRules = append(*firewallRules,
ebpf.EbpfFirewallRules{
IPCidr: catchAllRule.CIDR,
L4Info: catchAllRule.Ports,
})
return
}
// SetupWithManager sets up the controller with the Manager.
func (r *PolicyEndpointsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&policyk8sawsv1.PolicyEndpoint{}).
Complete(r)
}
func (r *PolicyEndpointsReconciler) derivePolicyEndpointsOfParentNP(ctx context.Context, parentNP, resourceNamespace string) []string {
var parentPolicyEndpointList []string
policyEndpointList := &policyk8sawsv1.PolicyEndpointList{}
if err := r.k8sClient.List(ctx, policyEndpointList, &client.ListOptions{
Namespace: resourceNamespace,
}); err != nil {
r.log.Info("Unable to list PolicyEndpoints", "err", err)
return nil
}
for _, policyEndpoint := range policyEndpointList.Items {
if policyEndpoint.Spec.PolicyRef.Name == parentNP {
parentPolicyEndpointList = append(parentPolicyEndpointList, policyEndpoint.Name)
r.log.Info("Found another PE resource for the parent NP", "name", policyEndpoint.Name)
}
}
return parentPolicyEndpointList
}
func (r *PolicyEndpointsReconciler) GeteBPFClient() ebpf.BpfClient {
return r.ebpfClient
}
func (r *PolicyEndpointsReconciler) DeriveFireWallRulesPerPodIdentifier(podIdentifier string, podNamespace string) ([]ebpf.EbpfFirewallRules,
[]ebpf.EbpfFirewallRules, error) {
ingressRules, egressRules, isIngressIsolated, isEgressIsolated, err := r.deriveIngressAndEgressFirewallRules(context.Background(), podIdentifier,
podNamespace, "", false)
if err != nil {
r.log.Error(err, "Error deriving firewall rules")
return ingressRules, egressRules, nil
}
if len(ingressRules) == 0 && !isIngressIsolated {
// No active ingress rules for this pod, but we only should land here
// if there are active egress rules. So, we need to add an allow-all entry to ingress rule set
r.log.Info("No Ingress rules and no ingress isolation - Appending catch all entry")
r.addCatchAllEntry(context.Background(), &ingressRules)
}
if len(egressRules) == 0 && !isEgressIsolated {
// No active egress rules for this pod but we only should land here
// if there are active ingress rules. So, we need to add an allow-all entry to egress rule set
r.log.Info("No Egress rules and no egress isolation - Appending catch all entry")
r.addCatchAllEntry(context.Background(), &egressRules)
}
return ingressRules, egressRules, nil
}
func (r *PolicyEndpointsReconciler) ArePoliciesAvailableInLocalCache(podIdentifier string) bool {
if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
if len(policyEndpointList.([]string)) > 0 {
r.log.Info("Active policies available against", "podIdentifier", podIdentifier)
return true
}
}
return false
}