npm/pkg/controlplane/controllers/v1/networkPolicyController.go (366 lines of code) (raw):

// Copyright 2018 Microsoft. All rights reserved. // MIT License package controllers import ( "fmt" "strconv" "time" "github.com/Azure/azure-container-networking/npm/ipsm" "github.com/Azure/azure-container-networking/npm/iptm" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/util" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" networkinginformers "k8s.io/client-go/informers/networking/v1" netpollister "k8s.io/client-go/listers/networking/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/utils/exec" ) // IsSafeCleanUpAzureNpmChain is used to indicate whether default Azure NPM chain can be safely deleted or not. type IsSafeCleanUpAzureNpmChain bool const ( safeToCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain = true unSafeToCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain = false ) type NetworkPolicyController struct { netPolLister netpollister.NetworkPolicyLister workqueue workqueue.RateLimitingInterface rawNpMap map[string]*networkingv1.NetworkPolicy // Key is <nsname>/<policyname> // (TODO): will leverage this strucute to manage network policy more efficiently // ProcessedNpMap map[string]*networkingv1.NetworkPolicy // Key is <nsname>/<podSelectorHash> // flag to indicate default Azure NPM chain is created or not isAzureNpmChainCreated bool ipsMgr *ipsm.IpsetManager iptMgr *iptm.IptablesManager } func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, ipsMgr *ipsm.IpsetManager, placeAzureChainFirst bool) *NetworkPolicyController { netPolController := &NetworkPolicyController{ netPolLister: npInformer.Lister(), workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NetworkPolicy"), rawNpMap: make(map[string]*networkingv1.NetworkPolicy), // ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), isAzureNpmChainCreated: false, ipsMgr: ipsMgr, iptMgr: iptm.NewIptablesManager(exec.New(), iptm.NewIptOperationShim(), placeAzureChainFirst), } npInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: netPolController.addNetworkPolicy, UpdateFunc: netPolController.updateNetworkPolicy, DeleteFunc: netPolController.deleteNetworkPolicy, }, ) return netPolController } // BootupDataplane does all initialization tasks for data plane // TODO(jungukcho) Need to refactor UninitNpmChains since it assumes it has already AZURE-NPM chains func (c *NetworkPolicyController) BootupDataplane() error { klog.Infof("Initiailize data plane. Clean up Azure-NPM chains and start reconcile iptables") // TODO(jungukcho): will clean-up error handling codes to initialize iptables and ipset in a separate PR // It is important to keep order to clean-up iptables and ipset. // IPtables should be cleaned first to avoid failures to clean-up iptables due to "ipset is using in kernel" error // 1. clean-up NPM-related iptables information and then running periodic processes to keep iptables correct if err := c.iptMgr.UninitNpmChains(); err != nil { utilruntime.HandleError(fmt.Errorf("Failed to UninitNpmChains with err: %w", err)) } // 2. then clean-up all NPM ipsets states if err := c.ipsMgr.DestroyNpmIpsets(); err != nil { utilruntime.HandleError(fmt.Errorf("Failed to DestroyNpmIpsets with err: %w", err)) } return nil } func (c *NetworkPolicyController) RunPeriodicTasks(stopCh <-chan struct{}) { // (TODO): Check any side effects c.iptMgr.ReconcileIPTables(stopCh) } func (c *NetworkPolicyController) LengthOfRawNpMap() int { return len(c.rawNpMap) } // getNetworkPolicyKey returns namespace/name of network policy object if it is valid network policy object and has valid namespace/name. // If not, it returns error. func (c *NetworkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) { var key string _, ok := obj.(*networkingv1.NetworkPolicy) if !ok { return key, fmt.Errorf("cannot cast obj (%v) to network policy obj", obj) } var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { return key, fmt.Errorf("error due to %w", err) } return key, nil } func (c *NetworkPolicyController) addNetworkPolicy(obj interface{}) { netPolkey, err := c.getNetworkPolicyKey(obj) if err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(netPolkey) } func (c *NetworkPolicyController) updateNetworkPolicy(old, newnetpol interface{}) { netPolkey, err := c.getNetworkPolicyKey(newnetpol) if err != nil { utilruntime.HandleError(err) return } // new network policy object is already checked validation by calling getNetworkPolicyKey function. newNetPol, _ := newnetpol.(*networkingv1.NetworkPolicy) oldNetPol, ok := old.(*networkingv1.NetworkPolicy) if ok { if oldNetPol.ResourceVersion == newNetPol.ResourceVersion { // Periodic resync will send update events for all known network plicies. // Two different versions of the same network policy will always have different RVs. return } } c.workqueue.Add(netPolkey) } func (c *NetworkPolicyController) deleteNetworkPolicy(obj interface{}) { netPolObj, ok := obj.(*networkingv1.NetworkPolicy) // DeleteFunc gets the final state of the resource (if it is known). // Otherwise, it gets an object of type DeletedFinalStateUnknown. // This can happen if the watch is closed and misses the delete event and // the controller doesn't notice the deletion until the subsequent re-list if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type: %v", obj) utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) return } if netPolObj, ok = tombstone.Obj.(*networkingv1.NetworkPolicy); !ok { metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type (error decoding object tombstone, invalid type): %v", obj) utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) return } } var netPolkey string var err error if netPolkey, err = cache.MetaNamespaceKeyFunc(netPolObj); err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(netPolkey) } func (c *NetworkPolicyController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() klog.Infof("Starting Network Policy worker") go wait.Until(c.runWorker, time.Second, stopCh) klog.Infof("Started Network Policy worker") <-stopCh klog.Info("Shutting down Network Policy workers") } func (c *NetworkPolicyController) runWorker() { for c.processNextWorkItem() { } } func (c *NetworkPolicyController) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // Run the syncNetPol, passing it the namespace/name string of the // network policy resource to be synced. if err := c.syncNetPol(key); err != nil { // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { utilruntime.HandleError(err) metrics.SendErrorLogAndMetric(util.NetpolID, "syncNetPol error due to %v", err) return true } return true } // syncNetPol compares the actual state with the desired, and attempts to converge the two. func (c *NetworkPolicyController) syncNetPol(key string) error { // timer for recording execution times timer := metrics.StartNewTimer() // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // record exec time after syncing operationKind := metrics.NoOp defer func() { metrics.RecordControllerPolicyExecTime(timer, operationKind, err != nil) }() // Get the network policy resource with this namespace/name netPolObj, err := c.netPolLister.NetworkPolicies(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { klog.Infof("Network Policy %s is not found, may be it is deleted", key) if _, ok := c.rawNpMap[key]; ok { // record time to delete policy if it exists (can't call within cleanUpNetworkPolicy because this can be called by a pod update) operationKind = metrics.DeleteOp } // netPolObj is not found, but should need to check the RawNpMap cache with key. // cleanUpNetworkPolicy method will take care of the deletion of a cached network policy if the cached network policy exists with key in our RawNpMap cache. err = c.cleanUpNetworkPolicy(key, safeToCleanUpAzureNpmChain) if err != nil { return fmt.Errorf("[syncNetPol] Error: %w when network policy is not found", err) } return err } return err } // If DeletionTimestamp of the netPolObj is set, start cleaning up lastly applied states. // This is early cleaning up process from updateNetPol event if netPolObj.ObjectMeta.DeletionTimestamp != nil || netPolObj.ObjectMeta.DeletionGracePeriodSeconds != nil { if _, ok := c.rawNpMap[key]; ok { // record time to delete policy if it exists (can't call within cleanUpNetworkPolicy because this can be called by a pod update) operationKind = metrics.DeleteOp } err = c.cleanUpNetworkPolicy(key, safeToCleanUpAzureNpmChain) if err != nil { return fmt.Errorf("error: %w when ObjectMeta.DeletionTimestamp field is set", err) } return nil } cachedNetPolObj, netPolExists := c.rawNpMap[key] if netPolExists { // if network policy does not have different states against lastly applied states stored in cachedNetPolObj, // netPolController does not need to reconcile this update. // In this updateNetworkPolicy event, // newNetPol was updated with states which netPolController does not need to reconcile. if isSameNetworkPolicy(cachedNetPolObj, netPolObj) { return nil } } operationKind, err = c.syncAddAndUpdateNetPol(netPolObj) if err != nil { return fmt.Errorf("[syncNetPol] Error due to %s\n", err.Error()) } return nil } // initializeDefaultAzureNpmChain install default rules for kube-system and iptables func (c *NetworkPolicyController) initializeDefaultAzureNpmChain() error { if c.isAzureNpmChainCreated { return nil } if err := c.ipsMgr.CreateSet(util.KubeSystemFlag, []string{util.IpsetNetHashFlag}); err != nil { return fmt.Errorf("[initializeDefaultAzureNpmChain] Error: failed to initialize kube-system ipset with err %w", err) } if err := c.iptMgr.InitNpmChains(); err != nil { return fmt.Errorf("[initializeDefaultAzureNpmChain] Error: failed to initialize azure-npm chains with err %w", err) } c.isAzureNpmChainCreated = true return nil } // syncAddAndUpdateNetPol handles a new network policy or an updated network policy object triggered by add and update events func (c *NetworkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) (metrics.OperationKind, error) { var err error netpolKey, err := cache.MetaNamespaceKeyFunc(netPolObj) if err != nil { // consider a no-op since we can't determine add vs. update. The exec time here isn't important either. return metrics.NoOp, fmt.Errorf("[syncAddAndUpdateNetPol] Error: while running MetaNamespaceKeyFunc err: %w", err) } // Start reconciling loop to eventually meet cached states against the desired states from network policy. // #1 If a new network policy is created, the network policy is not in RawNPMap. // start translating policy and install translated ipset and iptables rules into kernel // #2 If a network policy with <ns>-<netpol namespace>-<netpol name> is applied before and two network policy are the same object (same UID), // first delete the applied network policy, then start translating policy and install translated ipset and iptables rules into kernel // #3 If a network policy with <ns>-<netpol namespace>-<netpol name> is applied before and two network policy are the different object (different UID) due to missing some events for the old object // first delete the applied network policy, then start translating policy and install translated ipset and iptables rules into kernel // To deal with all three cases, we first delete network policy if possible, then install translated rules into kernel. // (TODO): can optimize logic more to reduce computations. For example, apply only difference if possible like podController // Do not need to clean up default Azure NPM chain in deleteNetworkPolicy function, if network policy object is applied soon. // So, avoid extra overhead to install default Azure NPM chain in initializeDefaultAzureNpmChain function. // To achieve it, use flag unSafeToCleanUpAzureNpmChain to indicate that the default Azure NPM chain cannot be deleted. // delete existing network policy var operationKind metrics.OperationKind if _, ok := c.rawNpMap[netpolKey]; ok { operationKind = metrics.UpdateOp } else { operationKind = metrics.CreateOp } err = c.cleanUpNetworkPolicy(netpolKey, unSafeToCleanUpAzureNpmChain) if err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to deleteNetworkPolicy due to %w", err) } // Install this default rules for kube-system and azure-npm chains if they are not initilized. // Execute initializeDefaultAzureNpmChain function first before actually starting processing network policy object. if err = c.initializeDefaultAzureNpmChain(); err != nil { return operationKind, fmt.Errorf("[syncNetPol] Error: due to %w", err) } // Cache network object first before applying ipsets and iptables. // If error happens while applying ipsets and iptables, // the key is re-queued in workqueue and process this function again, which eventually meets desired states of network policy c.rawNpMap[netpolKey] = netPolObj metrics.IncNumPolicies() c.ipsMgr.Lock() defer c.ipsMgr.Unlock() sets, namedPorts, lists, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(netPolObj) for _, set := range sets { klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) if err = c.ipsMgr.CreateSetNoLock(set, []string{util.IpsetNetHashFlag}); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset %s with err: %w", set, err) } } for _, set := range namedPorts { klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) if err = c.ipsMgr.CreateSetNoLock(set, []string{util.IpsetIPPortHashFlag}); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset named port %s with err: %w", set, err) } } // lists is a map with list name and members as value // NPM will create the list first and increments the refer count for listKey := range lists { if err = c.ipsMgr.CreateListNoLock(listKey); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset list %s with err: %w", listKey, err) } c.ipsMgr.IpSetReferIncOrDec(listKey, util.IpsetSetListFlag, ipsm.IncrementOp) } // Then NPM will add members to the above list, this is to avoid members being added // to lists before they are created. for listKey, listLabelsMembers := range lists { for _, listMember := range listLabelsMembers { if err = c.ipsMgr.AddToListNoLock(listKey, listMember); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: Adding ipset member %s to ipset list %s with err: %w", listMember, listKey, err) } } c.ipsMgr.IpSetReferIncOrDec(listKey, util.IpsetSetListFlag, ipsm.IncrementOp) } if err = c.createCidrsRule("in", netPolObj.Name, netPolObj.Namespace, ingressIPCidrs); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: createCidrsRule in due to %w", err) } if err = c.createCidrsRule("out", netPolObj.Name, netPolObj.Namespace, egressIPCidrs); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: createCidrsRule out due to %w", err) } for _, iptEntry := range iptEntries { if err = c.iptMgr.Add(iptEntry); err != nil { return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to apply iptables rule. Rule: %+v with err: %w", iptEntry, err) } } return operationKind, nil } // DeleteNetworkPolicy handles deleting network policy based on netPolKey. func (c *NetworkPolicyController) cleanUpNetworkPolicy(netPolKey string, isSafeCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain) error { cachedNetPolObj, cachedNetPolObjExists := c.rawNpMap[netPolKey] // if there is no applied network policy with the netPolKey, do not need to clean up process. if !cachedNetPolObjExists { return nil } // translate policy from "cachedNetPolObj" _, _, lists, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(cachedNetPolObj) var err error // delete iptables entries for _, iptEntry := range iptEntries { if err = c.iptMgr.Delete(iptEntry); err != nil { return fmt.Errorf("[cleanUpNetworkPolicy] Error: failed to apply iptables rule. Rule: %+v with err: %w", iptEntry, err) } } // lists is a map with list name and members as value for listKey := range lists { // We do not have delete the members before deleting set as, // 1. ipset allows deleting a ipset list with members // 2. if the refer count is more than one we should not remove members // 3. for reduced datapath operations if err = c.ipsMgr.DeleteList(listKey); err != nil { return fmt.Errorf("[cleanUpNetworkPolicy] Error: failed to delete ipset list %s with err: %w", listKey, err) } } // delete ipset list related to ingress CIDRs if err = c.removeCidrsRule("in", cachedNetPolObj.Name, cachedNetPolObj.Namespace, ingressIPCidrs); err != nil { return fmt.Errorf("[cleanUpNetworkPolicy] Error: removeCidrsRule in due to %w", err) } // delete ipset list related to egress CIDRs if err = c.removeCidrsRule("out", cachedNetPolObj.Name, cachedNetPolObj.Namespace, egressIPCidrs); err != nil { return fmt.Errorf("[cleanUpNetworkPolicy] Error: removeCidrsRule out due to %w", err) } // Sucess to clean up ipset and iptables operations in kernel and delete the cached network policy from RawNpMap delete(c.rawNpMap, netPolKey) metrics.DecNumPolicies() // If there is no cached network policy in RawNPMap anymore and no immediate network policy to process, start cleaning up default azure npm chains // However, UninitNpmChains function is failed which left failed states and will not retry, but functionally it is ok. // (TODO): Ideally, need to decouple cleaning-up default azure npm chains from "network policy deletion" event. if isSafeCleanUpAzureNpmChain && len(c.rawNpMap) == 0 { // Even though UninitNpmChains function returns error, isAzureNpmChainCreated sets up false. // So, when a new network policy is added, the "default Azure NPM chain" can be installed. c.isAzureNpmChainCreated = false if err = c.iptMgr.UninitNpmChains(); err != nil { utilruntime.HandleError(fmt.Errorf("error: failed to uninitialize azure-npm chains with err: %w", err)) return nil } } return nil } // (TODO) do not need to ipsMgr parameter func (c *NetworkPolicyController) createCidrsRule(direction, policyName, ns string, ipsets [][]string) error { spec := []string{util.IpsetNetHashFlag, util.IpsetMaxelemName, util.IpsetMaxelemNum} for i, ipCidrSet := range ipsets { if len(ipCidrSet) == 0 { continue } setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + direction klog.Infof("Creating set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) if err := c.ipsMgr.CreateSetNoLock(setName, spec); err != nil { return fmt.Errorf("[createCidrsRule] Error: creating ipset %s with err: %w", ipCidrSet, err) } for _, ipCidrEntry := range util.DropEmptyFields(ipCidrSet) { // Ipset doesn't allow 0.0.0.0/0 to be added. // A solution is split 0.0.0.0/0 in half which convert to 0.0.0.0/1 and 128.0.0.0/1. if ipCidrEntry == "0.0.0.0/0" { splitEntry := [2]string{"0.0.0.0/1", "128.0.0.0/1"} for _, entry := range splitEntry { if err := c.ipsMgr.AddToSetNoLock(setName, entry, util.IpsetNetHashFlag, ""); err != nil { return fmt.Errorf("[createCidrsRule] adding ip cidrs %s into ipset %s with err: %w", entry, ipCidrSet, err) } } } else { if err := c.ipsMgr.AddToSetNoLock(setName, ipCidrEntry, util.IpsetNetHashFlag, ""); err != nil { return fmt.Errorf("[createCidrsRule] adding ip cidrs %s into ipset %s with err: %w", ipCidrEntry, ipCidrSet, err) } } } } return nil } func (c *NetworkPolicyController) removeCidrsRule(direction, policyName, ns string, ipsets [][]string) error { for i, ipCidrSet := range ipsets { if len(ipCidrSet) == 0 { continue } setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + direction klog.Infof("Delete set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) if err := c.ipsMgr.DeleteSet(setName); err != nil { return fmt.Errorf("[removeCidrsRule] deleting ipset %s with err: %w", ipCidrSet, err) } } return nil } // GetProcessedNPKey will return netpolKey // (TODO): will use this function when optimizing management of multiple network policies with merging and deducting multiple network policies. // func (c *networkPolicyController) getProcessedNPKey(netPolObj *networkingv1.NetworkPolicy) string { // // hashSelector will never be empty // // (TODO): what if PodSelector is [] or nothing? - make the Unit test for this // hashedPodSelector := HashSelector(&netPolObj.Spec.PodSelector) // // (TODO): any chance to have namespace has zero length? // if len(netPolObj.GetNamespace()) > 0 { // hashedPodSelector = netPolObj.GetNamespace() + "/" + hashedPodSelector // } // return util.GetNSNameWithPrefix(hashedPodSelector) // }