npm/pkg/controlplane/controllers/v2/namespaceController.go (321 lines of code) (raw):

// Copyright 2018 Microsoft. All rights reserved. // MIT License package controllers import ( "encoding/json" "errors" "fmt" "sync" "time" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/controlplane/controllers/common" "github.com/Azure/azure-container-networking/npm/pkg/dataplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/util" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" k8slabels "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformer "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) var errWorkqueueFormatting = errors.New("error in formatting") // NpmNamespaceCache to store namespace struct in nameSpaceController.go. // Since this cache is shared between podController and NamespaceController, // it has mutex for avoiding racing condition between them. type NpmNamespaceCache struct { sync.RWMutex NsMap map[string]*common.Namespace // Key is ns-<nsname> } func (c *NpmNamespaceCache) GetCache() map[string]*common.Namespace { c.RLock() defer c.RUnlock() return c.NsMap } func (n *NpmNamespaceCache) MarshalJSON() ([]byte, error) { n.RLock() defer n.RUnlock() nsMapRaw, err := json.Marshal(n.NsMap) if err != nil { return nil, fmt.Errorf("failed to marshal nsMap due to %w", err) } return nsMapRaw, nil } type NamespaceController struct { dp dataplane.GenericDataplane nameSpaceLister corelisters.NamespaceLister workqueue workqueue.RateLimitingInterface npmNamespaceCache *NpmNamespaceCache } func NewNamespaceController(nameSpaceInformer coreinformer.NamespaceInformer, dp dataplane.GenericDataplane, npmNamespaceCache *NpmNamespaceCache) *NamespaceController { nameSpaceController := &NamespaceController{ dp: dp, nameSpaceLister: nameSpaceInformer.Lister(), workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Namespaces"), npmNamespaceCache: npmNamespaceCache, } nameSpaceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: nameSpaceController.addNamespace, UpdateFunc: nameSpaceController.updateNamespace, DeleteFunc: nameSpaceController.deleteNamespace, }, ) return nameSpaceController } func (n *NamespaceController) GetCache() map[string]*common.Namespace { return n.npmNamespaceCache.GetCache() } // filter this event if we do not need to handle this event func (nsc *NamespaceController) needSync(obj interface{}, event string) (string, bool) { needSync := false var key string nsObj, ok := obj.(*corev1.Namespace) if !ok { metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE %s EVENT] Received unexpected object type: %v", event, obj) return key, needSync } var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE %s EVENT] Error: NamespaceKey is empty for %s namespace", event, nsObj.Name) return key, needSync } needSync = true return key, needSync } func (nsc *NamespaceController) addNamespace(obj interface{}) { key, needSync := nsc.needSync(obj, "ADD") if !needSync { return } nsc.workqueue.Add(key) } func (nsc *NamespaceController) updateNamespace(old, newns interface{}) { key, needSync := nsc.needSync(newns, "UPDATE") if !needSync { return } nsObj, _ := newns.(*corev1.Namespace) oldNsObj, ok := old.(*corev1.Namespace) if ok { if oldNsObj.ResourceVersion == nsObj.ResourceVersion { return } } nsc.workqueue.Add(key) } func (nsc *NamespaceController) deleteNamespace(obj interface{}) { nsObj, ok := obj.(*corev1.Namespace) // DeleteFunc gets the final state of the resource (if it is known). // Otherwise, it gets an object of type DeletedFinalStateUnknown. // This can happen if the watch is closed and misses the delete event and // the controller doesn't notice the deletion until the subsequent re-list if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE DELETE EVENT]: Received unexpected object type: %v", obj) return } if nsObj, ok = tombstone.Obj.(*corev1.Namespace); !ok { metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE DELETE EVENT]: Received unexpected object type (error decoding object tombstone, invalid type): %v", obj) return } } var err error var key string if key, err = cache.MetaNamespaceKeyFunc(nsObj); err != nil { utilruntime.HandleError(err) metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE DELETE EVENT] Error: nameSpaceKey is empty for %s namespace", nsObj.Name) return } nsc.workqueue.Add(key) } func (nsc *NamespaceController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer nsc.workqueue.ShutDown() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Info("Starting Namespace controller\n") // klog.Info("Starting workers") // Launch workers to process namespace resources go wait.Until(nsc.runWorker, time.Second, stopCh) // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Info("Started workers") <-stopCh // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Info("Shutting down workers") } func (nsc *NamespaceController) runWorker() { for nsc.processNextWorkItem() { } } func (nsc *NamespaceController) processNextWorkItem() bool { obj, shutdown := nsc.workqueue.Get() if shutdown { return false } err := func(obj interface{}) error { defer nsc.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. nsc.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v, err %w", obj, errWorkqueueFormatting)) return nil } // Run the syncNamespace, passing it the namespace string of the // resource to be synced. if err := nsc.syncNamespace(key); err != nil { // Put the item back on the workqueue to handle any transient errors. nsc.workqueue.AddRateLimited(key) metrics.SendErrorLogAndMetric(util.NSID, "[processNextWorkItem] Error: failed to syncNamespace %s. Requeuing with err: %v", key, err) return err } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. nsc.workqueue.Forget(obj) // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { utilruntime.HandleError(err) return true } return true } // syncNamespace compares the actual state with the desired, and attempts to converge the two. func (nsc *NamespaceController) syncNamespace(nsKey string) error { // timer for recording execution times timer := metrics.StartNewTimer() // Get the Namespace resource with this key nsObj, err := nsc.nameSpaceLister.Get(nsKey) // apply dataplane and record exec time after syncing operationKind := metrics.NoOp defer func() { if err != nil { klog.Infof("[syncNamespace] failed to sync namespace, but will apply any changes to the dataplane. err: %s", err.Error()) } dperr := nsc.dp.ApplyDataPlane() // NOTE: it may seem like Prometheus is considering some ns create events as updates. // This happens when pod create events beat ns create events, so the pod controller will create the ipset // for the ns. This results in a ns "update" later when the ns controller processes the ns create event // can't record this in another deferred func since deferred funcs are processed in LIFO order metrics.RecordControllerNamespaceExecTime(timer, operationKind, err != nil && dperr != nil) if dperr != nil { klog.Errorf("failed to apply dataplane changes while syncing namespace. err: %s", dperr.Error()) metrics.SendErrorLogAndMetric(util.NSID, "[syncNamespace] failed to apply dataplane changes while syncing namespace. err: %s", dperr.Error()) // Seems like setting err below does nothing. // The return value of syncNamespace is fixed before this deferred func is called // so modifications to err here do nothing. // As a result, the controller will not requeue if there is an error applying the dataplane. // However, a subsequent controller event should Apply Dataplane soon after. if err == nil { err = fmt.Errorf("failed to apply dataplane changes while syncing namespace. err: %w", dperr) } else { err = fmt.Errorf("failed to sync namespace and apply dataplane changes. sync err: [%w], apply err: [%v]", err, dperr) } } }() // hold lock to avoid racing condition with PodController nsc.npmNamespaceCache.Lock() defer nsc.npmNamespaceCache.Unlock() if err != nil { if k8serrors.IsNotFound(err) { klog.Infof("Namespace %s not found, may be it is deleted", nsKey) if _, ok := nsc.npmNamespaceCache.NsMap[nsKey]; ok { // record time to delete namespace if it exists (can't call within cleanDeletedNamespace because this can be called by a pod update) operationKind = metrics.DeleteOp } // cleanDeletedNamespace will check if the NS exists in cache, if it does, then proceeds with deletion // if it does not exists, then event will be no-op err = nsc.cleanDeletedNamespace(nsKey) if err != nil { // need to retry this cleaning-up process metrics.SendErrorLogAndMetric(util.NSID, "Error: %v when namespace is not found", err) return fmt.Errorf("error: %w when namespace is not found", err) } } return err } if nsObj.DeletionTimestamp != nil || nsObj.DeletionGracePeriodSeconds != nil { if _, ok := nsc.npmNamespaceCache.NsMap[nsKey]; ok { // record time to delete namespace if it exists (can't call within cleanDeletedNamespace because this can be called by a pod update) operationKind = metrics.DeleteOp } return nsc.cleanDeletedNamespace(nsKey) } cachedNsObj, nsExists := nsc.npmNamespaceCache.NsMap[nsKey] if nsExists { if k8slabels.Equals(cachedNsObj.LabelsMap, nsObj.ObjectMeta.Labels) { klog.Infof("[NAMESPACE UPDATE EVENT] Namespace [%s] labels did not change", nsKey) return nil } } operationKind, err = nsc.syncUpdateNamespace(nsObj) if err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[syncNamespace] failed to sync namespace due to %s", err.Error()) return err } return nil } // syncAddNamespace handles adding namespace to ipset. func (nsc *NamespaceController) syncAddNamespace(nsObj *corev1.Namespace) error { namespaceSets := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsObj.ObjectMeta.Name, ipsets.Namespace)} setsToAddNamespaceTo := []*ipsets.IPSetMetadata{kubeAllNamespaces} npmNs := common.NewNs(nsObj.ObjectMeta.Name) nsc.npmNamespaceCache.NsMap[nsObj.ObjectMeta.Name] = npmNs // Add the namespace to its label's ipset list. for nsLabelKey, nsLabelVal := range nsObj.ObjectMeta.Labels { nsLabelKeyValue := util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal) // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Adding namespace %s to ipset list %s and %s", nsObj.ObjectMeta.Name, nsLabelKey, nsLabelKeyValue) labelIPSets := []*ipsets.IPSetMetadata{ ipsets.NewIPSetMetadata(nsLabelKey, ipsets.KeyLabelOfNamespace), ipsets.NewIPSetMetadata(nsLabelKeyValue, ipsets.KeyValueLabelOfNamespace), } setsToAddNamespaceTo = append(setsToAddNamespaceTo, labelIPSets...) // Append succeeded labels to the cache NS obj npmNs.AppendLabels(map[string]string{nsLabelKey: nsLabelVal}, common.AppendToExistingLabels) } if err := nsc.dp.AddToLists(setsToAddNamespaceTo, namespaceSets); err != nil { return fmt.Errorf("failed to sync add namespace with error %w", err) } return nil } // syncUpdateNamespace handles updating namespace in ipset. func (nsc *NamespaceController) syncUpdateNamespace(newNsObj *corev1.Namespace) (metrics.OperationKind, error) { var err error newNsName, newNsLabel := newNsObj.ObjectMeta.Name, newNsObj.ObjectMeta.Labels // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("NAMESPACE UPDATING:\n namespace: [%s/%v]", newNsName, newNsLabel) // If previous syncAddNamespace failed for some reasons // before caching npm namespace object or syncUpdateNamespace is called due to namespace creation event, // then there is no cached object in nsMap. curNsObj, exists := nsc.npmNamespaceCache.NsMap[newNsName] if !exists { if newNsObj.ObjectMeta.DeletionTimestamp == nil && newNsObj.ObjectMeta.DeletionGracePeriodSeconds == nil { if er := nsc.syncAddNamespace(newNsObj); er != nil { return metrics.CreateOp, fmt.Errorf("failed to sync add namespace with err %w", err) } } return metrics.CreateOp, nil } // now we know this is an update event, and we'll return metrics.UpdateOp // If the Namespace is not deleted, delete removed labels and create new labels addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(curNsObj.LabelsMap, newNsLabel) // Delete the namespace from its label's ipset list. for _, nsLabelVal := range deleteFromIPSets { var labelSet *ipsets.IPSetMetadata if util.IsKeyValueLabelSetName(nsLabelVal) { labelSet = ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyValueLabelOfNamespace) } else { labelSet = ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyLabelOfNamespace) } toBeRemoved := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newNsName, ipsets.Namespace)} // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Deleting namespace %s from ipset list %s", newNsName, nsLabelVal) if err = nsc.dp.RemoveFromList(labelSet, toBeRemoved); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", newNsName, nsLabelVal, err) return metrics.UpdateOp, fmt.Errorf("failed to remove from list during sync update namespace with err %w", err) } // {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key // key + val ipsets are worked on. // (TODO) need to remove this ordering dependency removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(nsLabelVal) if removedLabelValue != "" { curNsObj.RemoveLabelsWithKey(removedLabelKey) } } // Add the namespace to its label's ipset list. for _, nsLabelVal := range addToIPSets { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Adding namespace %s to ipset list %s", newNsName, nsLabelVal) var labelSet []*ipsets.IPSetMetadata if util.IsKeyValueLabelSetName(nsLabelVal) { labelSet = []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyValueLabelOfNamespace)} } else { labelSet = []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyLabelOfNamespace)} } toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newNsName, ipsets.Namespace)} if err = nsc.dp.AddToLists(labelSet, toBeAdded); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to add namespace %s to ipset list %s with err: %v", newNsName, nsLabelVal, err) return metrics.UpdateOp, fmt.Errorf("failed to add %v sets to %v lists during addtolists in sync update namespace with err %w", toBeAdded, labelSet, err) } // {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels // only after both ipsets for a given label's key value pair are added successfully addedLabelKey, addedLabelValue := util.GetLabelKVFromSet(nsLabelVal) if addedLabelValue != "" { curNsObj.AppendLabels(map[string]string{addedLabelKey: addedLabelValue}, common.AppendToExistingLabels) } } // Append all labels to the cache NS obj // If due to ordering issue the above deleted and added labels are not correct, // this below appendLabels will help ensure correct state in cache for all successful ops. curNsObj.AppendLabels(newNsLabel, common.ClearExistingLabels) nsc.npmNamespaceCache.NsMap[newNsName] = curNsObj return metrics.UpdateOp, nil } // cleanDeletedNamespace handles deleting namespace from ipset. func (nsc *NamespaceController) cleanDeletedNamespace(cachedNsKey string) error { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("NAMESPACE DELETING: [%s]", cachedNsKey) cachedNsObj, exists := nsc.npmNamespaceCache.NsMap[cachedNsKey] if !exists { return nil } // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("NAMESPACE DELETING cached labels: [%s/%v]", cachedNsKey, cachedNsObj.LabelsMap) var err error toBeDeletedNs := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace)} // Delete the namespace from its label's ipset list. for nsLabelKey, nsLabelVal := range cachedNsObj.LabelsMap { labelKey := ipsets.NewIPSetMetadata(nsLabelKey, ipsets.KeyLabelOfNamespace) // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelKey) if err = nsc.dp.RemoveFromList(labelKey, toBeDeletedNs); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelKey, err) return fmt.Errorf("failed to clean deleted namespace when deleting key with err %w", err) } labelIpsetName := util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal) labelKeyValue := ipsets.NewIPSetMetadata(labelIpsetName, ipsets.KeyValueLabelOfNamespace) // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelIpsetName) if err = nsc.dp.RemoveFromList(labelKeyValue, toBeDeletedNs); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelIpsetName, err) return fmt.Errorf("failed to clean deleted namespace when deleting key value with err %w", err) } // remove labels from the cache NS obj cachedNsObj.RemoveLabelsWithKey(nsLabelKey) } allNamespacesSet := ipsets.NewIPSetMetadata(util.KubeAllNamespacesFlag, ipsets.KeyLabelOfNamespace) toBeDeletedCachedKey := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace)} // Delete the namespace from all-namespace ipset list. if err = nsc.dp.RemoveFromList(allNamespacesSet, toBeDeletedCachedKey); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, util.KubeAllNamespacesFlag, err) return fmt.Errorf("failed to remove from list during clean deleted namespace %w", err) } delete(nsc.npmNamespaceCache.NsMap, cachedNsKey) return nil }