npm/pkg/dataplane/dataplane.go (531 lines of code) (raw):

package dataplane import ( "errors" "fmt" "strings" "sync" "time" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "k8s.io/klog" ) const ( reconcileDuration = time.Duration(5 * time.Minute) contextBackground = "BACKGROUND" contextApplyDP = "APPLY-DP" contextAddNetPol = "ADD-NETPOL" contextAddNetPolBootup = "BOOTUP-ADD-NETPOL" contextAddNetPolPrecaution = "ADD-NETPOL-PRECAUTION" contextDelNetPol = "DEL-NETPOL" ) var ( ErrInvalidApplyConfig = errors.New("invalid apply config") ErrIncorrectNumberOfNetPols = errors.New("expected to have exactly one netpol since dp.netPolInBackground == false") ) type PolicyMode string // TODO put NodeName in Config? type Config struct { debug bool // ApplyInBackground is currently used in Windows to apply the following in background: IPSets and NetPols for new/updated Pods ApplyInBackground bool ApplyMaxBatches int ApplyInterval time.Duration // NetPolInBackground is currently used in Linux to apply NetPol controller Add events in the background NetPolInBackground bool MaxPendingNetPols int NetPolInterval time.Duration EnableNPMLite bool *ipsets.IPSetManagerCfg *policies.PolicyManagerCfg } type removePolicyInfo struct { sync.Mutex previousRemovePolicyIPSetsFailed bool } type DataPlane struct { *Config applyInBackground bool netPolInBackground bool policyMgr *policies.PolicyManager ipsetMgr *ipsets.IPSetManager networkID string nodeName string // endpointCache stores all endpoints of the network (including off-node) // Key is PodIP endpointCache *endpointCache ioShim *common.IOShim updatePodCache *updatePodCache endpointQuery *endpointQuery endpointQueryAttachedState *endpointQuery // windows -> filter for state 2 (attached) endpoints in l1vh applyInfo *applyInfo netPolQueue *netPolQueue // removePolicyInfo tracks when a policy was removed yet had ApplyIPSet failures. // This field is only relevant for Linux. removePolicyInfo removePolicyInfo stopChannel <-chan struct{} } func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChannel <-chan struct{}) (*DataPlane, error) { metrics.InitializeAll() if util.IsWindowsDP() { klog.Infof("[DataPlane] enabling AddEmptySetToLists for Windows") cfg.IPSetManagerCfg.AddEmptySetToLists = true } dp := &DataPlane{ Config: cfg, policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg), ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim), // networkID is set when initializing Windows dataplane networkID: "", endpointCache: newEndpointCache(), nodeName: nodeName, ioShim: ioShim, endpointQuery: new(endpointQuery), endpointQueryAttachedState: new(endpointQuery), applyInfo: &applyInfo{ inBootupPhase: true, }, netPolQueue: newNetPolQueue(), stopChannel: stopChannel, } // do not let Linux apply in background dp.applyInBackground = cfg.ApplyInBackground && util.IsWindowsDP() if dp.applyInBackground { klog.Infof("[DataPlane] dataplane configured to apply in background every %v or every %d calls to ApplyDataPlane()", dp.ApplyInterval, dp.ApplyMaxBatches) dp.updatePodCache = newUpdatePodCache(cfg.ApplyMaxBatches) if dp.ApplyMaxBatches <= 0 || dp.ApplyInterval == 0 { return nil, ErrInvalidApplyConfig } } else { klog.Info("[DataPlane] dataplane configured to NOT apply in background") dp.updatePodCache = newUpdatePodCache(1) } err := dp.BootupDataplane() if err != nil { klog.Errorf("Failed to reset dataplane: %v", err) return nil, err } // Prevent netpol in background unless we're in Linux and using nftables. // This step must be performed after bootupDataplane() because it calls util.DetectIptablesVersion(), which sets the proper value for util.Iptables dp.netPolInBackground = cfg.NetPolInBackground && !util.IsWindowsDP() && (strings.Contains(util.Iptables, "nft") || dp.debug) if dp.netPolInBackground { msg := fmt.Sprintf("[DataPlane] dataplane configured to add netpols in background every %v or every %d calls to AddPolicy()", dp.NetPolInterval, dp.MaxPendingNetPols) metrics.SendLog(util.DaemonDataplaneID, msg, true) } else { metrics.SendLog(util.DaemonDataplaneID, "[DataPlane] dataplane configured to NOT add netpols in background", true) } return dp, nil } // BootupDataplane cleans the NPM sets and policies in the dataplane and performs initialization. func (dp *DataPlane) BootupDataplane() error { // NOTE: used to create an all-namespaces set, but there's no need since it will be created by the control plane return dp.bootupDataPlane() //nolint:wrapcheck // unnecessary to wrap error } // FinishBootupPhase marks the point when Pod Controller is starting to run, so dp.AddPolicy() can no longer apply IPSets in the background. // This function must be called on Windows when ApplyInBackground is true. func (dp *DataPlane) FinishBootupPhase() { if !dp.applyInBackground { return } dp.applyInfo.Lock() defer dp.applyInfo.Unlock() klog.Infof("[DataPlane] finished bootup phase") dp.applyInfo.inBootupPhase = false } // RunPeriodicTasks runs periodic tasks. Should only be called once. func (dp *DataPlane) RunPeriodicTasks() { go func() { ticker := time.NewTicker(reconcileDuration) defer ticker.Stop() for { select { case <-dp.stopChannel: return case <-ticker.C: // send the heartbeat log in another go routine in case it takes a while go metrics.SendHeartbeatWithNumPolicies() // locks ipset manager dp.ipsetMgr.Reconcile() // in Windows, does nothing // in Linux, locks policy manager but can be interrupted dp.policyMgr.Reconcile() } } }() if dp.netPolInBackground { go func() { ticker := time.NewTicker(dp.NetPolInterval) defer ticker.Stop() for { select { case <-dp.stopChannel: return case <-ticker.C: // Choose to keep netPolQueue locked while running iptables-restore within addPoliciesWithRetry. // We are only blocking the NetPol controller, which wouldn't be able to use the PolicyManager while it's reconciling. // Technically, NetPol controller could be adding IPSets during this lock, but this is very fast. // Preferring thread safety over optimized performance. dp.netPolQueue.Lock() if dp.netPolQueue.len() == 0 { dp.netPolQueue.Unlock() continue } dp.addPoliciesWithRetry(contextBackground) dp.netPolQueue.Unlock() } } }() } if !dp.applyInBackground { return } go func() { ticker := time.NewTicker(dp.ApplyInterval) defer ticker.Stop() for { select { case <-dp.stopChannel: return case <-ticker.C: dp.applyInfo.Lock() numBatches := dp.applyInfo.numBatches dp.applyInfo.Unlock() if numBatches == 0 { continue } if err := dp.applyDataPlaneNow(contextBackground); err != nil { klog.Errorf("[DataPlane] failed to apply dataplane in background: %v", err) metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to apply dataplane in background: %v", err) } } } }() } func (dp *DataPlane) GetIPSet(setName string) *ipsets.IPSet { return dp.ipsetMgr.GetIPSet(setName) } // CreateIPSets takes in a set object and updates local cache with this set func (dp *DataPlane) CreateIPSets(setMetadata []*ipsets.IPSetMetadata) { dp.ipsetMgr.CreateIPSets(setMetadata) } // DeleteSet checks for members and references of the given "set" type ipset // if not used then will delete it from cache func (dp *DataPlane) DeleteIPSet(setMetadata *ipsets.IPSetMetadata, forceDelete util.DeleteOption) { dp.ipsetMgr.DeleteIPSet(setMetadata.GetPrefixName(), forceDelete) } // AddToSets takes in a list of IPSet names along with IP member // and then updates it local cache func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error { err := dp.ipsetMgr.AddToSets(setNames, podMetadata.PodIP, podMetadata.PodKey) if err != nil { return fmt.Errorf("[DataPlane] error while adding to set: %w", err) } if dp.shouldUpdatePod() && podMetadata.NodeName == dp.nodeName { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Updating Sets to Add for pod key %s", podMetadata.PodKey) // lock updatePodCache while reading/modifying or setting the updatePod in the cache dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToAdd(setNames) } return nil } // RemoveFromSets takes in list of setnames from which a given IP member should be // removed and will update the local cache func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error { err := dp.ipsetMgr.RemoveFromSets(setNames, podMetadata.PodIP, podMetadata.PodKey) if err != nil { return fmt.Errorf("[DataPlane] error while removing from set: %w", err) } if dp.shouldUpdatePod() && podMetadata.NodeName == dp.nodeName { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Updating Sets to Remove for pod key %s", podMetadata.PodKey) // lock updatePodCache while reading/modifying or setting the updatePod in the cache dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() updatePod := dp.updatePodCache.enqueue(podMetadata) updatePod.updateIPSetsToRemove(setNames) } return nil } // AddToLists takes a list name and list of sets which are to be added as members // to given list func (dp *DataPlane) AddToLists(listName, setNames []*ipsets.IPSetMetadata) error { err := dp.ipsetMgr.AddToLists(listName, setNames) if err != nil { return fmt.Errorf("[DataPlane] error while adding to list: %w", err) } return nil } // RemoveFromList takes a list name and list of sets which are to be removed as members // to given list func (dp *DataPlane) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []*ipsets.IPSetMetadata) error { err := dp.ipsetMgr.RemoveFromList(listName, setNames) if err != nil { return fmt.Errorf("[DataPlane] error while removing from list: %w", err) } return nil } // ApplyDataPlane all the IPSet operations just update cache and update a dirty ipset structure, // they do not change apply changes into dataplane. This function needs to be called at the // end of IPSet operations of a given controller event, it will check for the dirty ipset list // and accordingly makes changes in dataplane. This function helps emulate a single call to // dataplane instead of multiple ipset operations calls ipset operations calls to dataplane func (dp *DataPlane) ApplyDataPlane() error { if !dp.applyInBackground { return dp.applyDataPlaneNow(contextApplyDP) } // increment batch and apply dataplane if needed dp.applyInfo.Lock() dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches dp.applyInfo.Unlock() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] new batch count: %d", contextApplyDP, newCount) if newCount >= dp.ApplyMaxBatches { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextApplyDP, newCount) return dp.applyDataPlaneNow(contextApplyDP) } return nil } func (dp *DataPlane) applyDataPlaneNow(context string) error { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] starting to apply ipsets", context) err := dp.ipsetMgr.ApplyIPSets() if err != nil { return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", context, err) } // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] finished applying ipsets", context) // see comment in RemovePolicy() for why this is here dp.setRemovePolicyFailure(false) if dp.applyInBackground { dp.applyInfo.Lock() dp.applyInfo.numBatches = 0 dp.applyInfo.Unlock() } // NOTE: ideally we won't refresh Pod Endpoints if the updatePodCache is empty if dp.shouldUpdatePod() { // do not refresh endpoints if the updatePodCache is empty dp.updatePodCache.Lock() if dp.updatePodCache.isEmpty() { dp.updatePodCache.Unlock() return nil } dp.updatePodCache.Unlock() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] refreshing endpoints before updating pods", context) err := dp.refreshPodEndpoints() if err != nil { metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to refresh endpoints while updating pods. err: [%s]", err.Error()) // return as success since this can be retried irrespective of other operations return nil } // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] refreshed endpoints", context) // lock updatePodCache while driving goal state to kernel // prevents another ApplyDataplane call from updating the same pods dp.updatePodCache.Lock() defer dp.updatePodCache.Unlock() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] starting to update pods", context) for !dp.updatePodCache.isEmpty() { pod := dp.updatePodCache.dequeue() if pod == nil { // should never happen because of isEmpty check above and lock on updatePodCache metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane") // break to avoid infinite loop (something weird happened since isEmpty returned false above) break } if err := dp.updatePod(pod); err != nil { // move on to the next and later return as success since this can be retried irrespective of other operations metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error()) dp.updatePodCache.requeue(pod) continue } } // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [ApplyDataPlane] [%s] finished updating pods", context) } return nil } // AddPolicy takes in a translated NPMNetworkPolicy object and applies on dataplane func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Add Policy called for %s", policy.PolicyKey) if !dp.netPolInBackground { return dp.addPolicies([]*policies.NPMNetworkPolicy{policy}) } // Choose to keep netPolQueue locked while running iptables-restore within addPoliciesWithRetry. // We are not blocking any thread but the background NetPol thread, which would run the same command anyways dp.netPolQueue.Lock() defer dp.netPolQueue.Unlock() dp.netPolQueue.enqueue(policy) newCount := dp.netPolQueue.len() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] new pending netpol count: %d", contextAddNetPol, newCount) if newCount >= dp.MaxPendingNetPols { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPol, newCount) dp.addPoliciesWithRetry(contextAddNetPol) } return nil } // addPoliciesWithRetry tries adding all policies. If this fails, it tries adding policies one by one. // The caller must lock netPolQueue. func (dp *DataPlane) addPoliciesWithRetry(context string) { netPols := dp.netPolQueue.dump() // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] adding policies %+v", netPols) err := dp.addPolicies(netPols) if err == nil { // clear queue and return on success // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] added policies successfully", context) dp.netPolQueue.clear() return } klog.Errorf("[DataPlane] [%s] failed to add policies. will retry one policy at a time. err: %s", context, err.Error()) metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] [%s] failed to add policies. err: %s", context, err.Error()) // retry one policy at a time for _, netPol := range netPols { err = dp.addPolicies([]*policies.NPMNetworkPolicy{netPol}) if err == nil { // remove from queue on success // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] added policy successfully one at a time. policyKey: %s", context, netPol.PolicyKey) dp.netPolQueue.delete(netPol.PolicyKey) } else { // keep in queue on failure klog.Errorf("[DataPlane] [%s] failed to add policy one at a time. policyKey: %s. err: %s", context, netPol.PolicyKey, err.Error()) metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] [%s] failed to add policy one at a time. %s. err: %s", context, netPol.PolicyKey, err.Error()) } } } func (dp *DataPlane) addPolicies(netPols []*policies.NPMNetworkPolicy) error { if !dp.netPolInBackground && len(netPols) != 1 { klog.Errorf("[DataPlane] expected to have one NetPol in dp.addPolicies() since dp.netPolInBackground == false") metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] expected to have one NetPol in dp.addPolicies() since dp.netPolInBackground == false") return ErrIncorrectNumberOfNetPols } if len(netPols) == 0 { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] expected to have at least one NetPol in dp.addPolicies()") return nil } inBootupPhase := false if dp.applyInBackground { dp.applyInfo.Lock() inBootupPhase = dp.applyInfo.inBootupPhase if inBootupPhase { // keep holding the lock to block FinishBootupPhase() and prevent PodController from // coming back online and causing race issues from updatePod() within applyDataPlaneNow() defer dp.applyInfo.Unlock() } else { dp.applyInfo.Unlock() } } if dp.hadRemovePolicyFailure() { if inBootupPhase { // this should never happen because bootup phase is for windows, but just in case, we don't want to applyDataplaneNow() or else there will be a deadlock on dp.applyInfo msg := fmt.Sprintf("[DataPlane] [%s] at risk of improperly applying a policy which is removed then readded", contextAddNetPolPrecaution) klog.Warning(msg) metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, msg) } else { // prevent #2977 if err := dp.applyDataPlaneNow(contextAddNetPolPrecaution); err != nil { return err // nolint:wrapcheck // unnecessary to wrap error since the provided context is included in the error } } } // 1. Add IPSets and apply for each NetPol. // Apply IPSets after each NetworkPolicy unless ApplyInBackground=true and we're in the bootup phase (only happens for Windows currently) for _, netPol := range netPols { // Create and add references for Selector IPSets first err := dp.createIPSetsAndReferences(netPol.AllPodSelectorIPSets(), netPol.PolicyKey, ipsets.SelectorType) if err != nil { klog.Infof("[DataPlane] error while adding Selector IPSet references: %s", err.Error()) return fmt.Errorf("[DataPlane] error while adding Selector IPSet references: %w", err) } // Create and add references for Rule IPSets err = dp.createIPSetsAndReferences(netPol.RuleIPSets, netPol.PolicyKey, ipsets.NetPolType) if err != nil { klog.Infof("[DataPlane] error while adding Rule IPSet references: %s", err.Error()) return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err) } if inBootupPhase { // This branch can only be taken in Windows. // During bootup phase, the Pod controller will not be running. // We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet. // Ideally, we get all NetworkPolicies in the cache before the Pod controller starts // increment batch and apply IPSets if needed dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] new batch count: %d", contextAddNetPolBootup, newCount) if newCount >= dp.ApplyMaxBatches { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount) // klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup) err = dp.ipsetMgr.ApplyIPSets() if err != nil { return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err) } // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] [%s] finished applying ipsets", contextAddNetPolBootup) // see comment in RemovePolicy() for why this is here dp.setRemovePolicyFailure(false) dp.applyInfo.numBatches = 0 } continue } // not in bootup phase // this codepath is always taken in Linux err = dp.applyDataPlaneNow(contextAddNetPol) if err != nil { return err } } // 2. Add NetPols in policyMgr var endpointList map[string]string var err error if !inBootupPhase { endpointList, err = dp.getEndpointsToApplyPolicies(netPols) if err != nil { return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err) } } // during bootup phase, endpointList will be nil err = dp.policyMgr.AddPolicies(netPols, endpointList) if err != nil { return fmt.Errorf("[DataPlane] [%s] error while adding policies: %w", contextAddNetPolBootup, err) } return nil } // RemovePolicy takes in network policyKey (namespace/name of network policy) and removes it from dataplane and cache func (dp *DataPlane) RemovePolicy(policyKey string) error { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Remove Policy called for %s", policyKey) if dp.netPolInBackground { // make sure to not add this NetPol if we're deleting it // hold the lock for the rest of this function so that we don't contend or have races with the background NetPol thread dp.netPolQueue.Lock() defer dp.netPolQueue.Unlock() dp.netPolQueue.delete(policyKey) } // because policy Manager will remove from policy from cache // keep a local copy to remove references for ipsets policy, ok := dp.policyMgr.GetPolicy(policyKey) if !ok { klog.Infof("[DataPlane] Policy %s is not found. Might been deleted already", policyKey) return nil } endpoints := make(map[string]string, len(policy.PodEndpoints)) for podIP, endpointID := range policy.PodEndpoints { endpoints[podIP] = endpointID } // Use the endpoint list saved in cache for this network policy to remove err := dp.policyMgr.RemovePolicy(policy.PolicyKey) if err != nil { return fmt.Errorf("[DataPlane] error while removing policy: %w", err) } if dp.shouldUpdatePod() { dp.endpointCache.Lock() for podIP := range endpoints { // if the endpoint is not in the policy's endpoint list, delete policy reference from cache if _, ok := policy.PodEndpoints[podIP]; !ok { // check if the endpoint is in the cache if endpoint, ok := dp.endpointCache.cache[podIP]; ok { delete(endpoint.netPolReference, policyKey) } } } dp.endpointCache.Unlock() } // Remove references for Rule IPSets first err = dp.deleteIPSetsAndReferences(policy.RuleIPSets, policy.PolicyKey, ipsets.NetPolType) if err != nil { return err } // Remove references for Selector IPSets err = dp.deleteIPSetsAndReferences(policy.AllPodSelectorIPSets(), policy.PolicyKey, ipsets.SelectorType) if err != nil { return err } if err := dp.applyDataPlaneNow(contextDelNetPol); err != nil { // Failed to apply IPSets while removing this policy. // Consider this removepolicy call a failure until apply IPSets is successful. // Related to #2977 klog.Info("[DataPlane] remove policy has failed to apply ipsets. setting remove policy failure") dp.setRemovePolicyFailure(true) return err // nolint:wrapcheck // unnecessary to wrap error since the provided context is included in the error } return nil } // UpdatePolicy takes in updated policy object, calculates the delta and applies changes // onto dataplane accordingly func (dp *DataPlane) UpdatePolicy(policy *policies.NPMNetworkPolicy) error { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Update Policy called for %s", policy.PolicyKey) ok := dp.policyMgr.PolicyExists(policy.PolicyKey) if !ok { // TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level // klog.Infof("[DataPlane] Policy %s is not found.", policy.PolicyKey) return dp.AddPolicy(policy) } // TODO it would be ideal to calculate a diff of policies // and remove/apply only the delta of IPSets and policies // Taking the easy route here, delete existing policy err := dp.RemovePolicy(policy.PolicyKey) if err != nil { return fmt.Errorf("[DataPlane] error while updating policy: %w", err) } // and add the new updated policy err = dp.AddPolicy(policy) if err != nil { return fmt.Errorf("[DataPlane] error while updating policy: %w", err) } return nil } func (dp *DataPlane) GetAllIPSets() map[string]string { return dp.ipsetMgr.GetAllIPSets() } // GetAllPolicies is deprecated and only used in the goalstateprocessor, which is deprecated func (dp *DataPlane) GetAllPolicies() []string { return nil } func (dp *DataPlane) createIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, netpolName string, referenceType ipsets.ReferenceType) error { // Create IPSets first along with reference updates npmErrorString := npmerrors.AddSelectorReference if referenceType == ipsets.NetPolType { npmErrorString = npmerrors.AddNetPolReference } for _, set := range sets { dp.ipsetMgr.CreateIPSets([]*ipsets.IPSetMetadata{set.Metadata}) err := dp.ipsetMgr.AddReference(set.Metadata, netpolName, referenceType) if err != nil { return npmerrors.Errorf(npmErrorString, false, fmt.Sprintf("[DataPlane] failed to add reference with err: %s", err.Error())) } } // Check if any list sets are provided with members to add for _, set := range sets { // Check if any CIDR block IPSets needs to be applied setType := set.Metadata.Type if setType == ipsets.CIDRBlocks { // ipblock can have either cidr (CIDR in IPBlock) or "cidr + " " (space) + nomatch" (Except in IPBlock) // (TODO) need to revise it for windows for _, ipblock := range set.Members { err := dp.ipsetMgr.AddToSets([]*ipsets.IPSetMetadata{set.Metadata}, ipblock, "") if err != nil { return npmerrors.Errorf(npmErrorString, false, fmt.Sprintf("[DataPlane] failed to AddToSet in addIPSetReferences with err: %s", err.Error())) } } } else if setType == ipsets.NestedLabelOfPod && len(set.Members) > 0 { // Check if any 2nd level IPSets are generated by Controller with members // Apply members to the list set err := dp.ipsetMgr.AddToLists([]*ipsets.IPSetMetadata{set.Metadata}, ipsets.GetMembersOfTranslatedSets(set.Members)) if err != nil { return npmerrors.Errorf(npmErrorString, false, fmt.Sprintf("[DataPlane] failed to AddToList in addIPSetReferences with err: %s", err.Error())) } } } return nil } func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, netpolName string, referenceType ipsets.ReferenceType) error { for _, set := range sets { prefixName := set.Metadata.GetPrefixName() if err := dp.ipsetMgr.DeleteReference(prefixName, netpolName, referenceType); err != nil { // with current implementation of DeleteReference(), err will be ipsets.ErrSetDoesNotExist klog.Infof("[DataPlane] ignoring delete reference on non-existent set. ipset: %s. netpol: %s. referenceType: %s", prefixName, netpolName, referenceType) } } npmErrorString := npmerrors.DeleteSelectorReference if referenceType == ipsets.NetPolType { npmErrorString = npmerrors.DeleteNetPolReference } // Check if any list sets are provided with members to delete // NOTE: every translated member will be deleted, even if the member is part of the same set in another policy // see the definition of TranslatedIPSet for how to avoid this situation for _, set := range sets { // Check if any CIDR block IPSets needs to be applied setType := set.Metadata.Type if setType == ipsets.CIDRBlocks { // ipblock can have either cidr (CIDR in IPBlock) or "cidr + " " (space) + nomatch" (Except in IPBlock) // (TODO) need to revise it for windows for _, ipblock := range set.Members { err := dp.ipsetMgr.RemoveFromSets([]*ipsets.IPSetMetadata{set.Metadata}, ipblock, "") if err != nil { return npmerrors.Errorf(npmErrorString, false, fmt.Sprintf("[DataPlane] failed to RemoveFromSet in deleteIPSetReferences with err: %s", err.Error())) } } } else if set.Metadata.GetSetKind() == ipsets.ListSet && len(set.Members) > 0 { // Delete if any 2nd level IPSets are generated by Controller with members err := dp.ipsetMgr.RemoveFromList(set.Metadata, ipsets.GetMembersOfTranslatedSets(set.Members)) if err != nil { return npmerrors.Errorf(npmErrorString, false, fmt.Sprintf("[DataPlane] failed to RemoveFromList in deleteIPSetReferences with err: %s", err.Error())) } } // Try to delete these IPSets // NOTE: if we ever remove destroy line, we must handle ipset -D for CIDR nomatch in Linux // A delete-member call for "1.1.1.1/32 nomatch" would need to be "-D 1.1.1.1" instead of "-D 1.1.1.1/nomatch" dp.ipsetMgr.DeleteIPSet(set.Metadata.GetPrefixName(), false) } return nil } func (dp *DataPlane) setRemovePolicyFailure(failed bool) { if util.IsWindowsDP() { return } dp.removePolicyInfo.Lock() defer dp.removePolicyInfo.Unlock() dp.removePolicyInfo.previousRemovePolicyIPSetsFailed = failed } func (dp *DataPlane) hadRemovePolicyFailure() bool { if util.IsWindowsDP() { return false } dp.removePolicyInfo.Lock() defer dp.removePolicyInfo.Unlock() return dp.removePolicyInfo.previousRemovePolicyIPSetsFailed }