npm/pkg/dataplane/policies/policymanager.go (175 lines of code) (raw):

package policies import ( "fmt" "sync" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "k8s.io/klog" ) // PolicyManagerMode will be used in windows to decide if // SetPolicies should be used or not type PolicyManagerMode string const ( // IPSetPolicyMode will references IPSets in policies IPSetPolicyMode PolicyManagerMode = "IPSet" // IPPolicyMode will replace ipset names with their value IPs in policies // NOTE: this is currently unimplemented IPPolicyMode PolicyManagerMode = "IP" // this number is based on the implementation in chain-management_linux.go // it represents the number of rules unrelated to policies // it's technically 3 off when there are no policies since we flush the AZURE-NPM chain then numLinuxBaseACLRules = 11 ) type PolicyManagerCfg struct { // NodeIP is only used in Windows NodeIP string // PolicyMode only affects Windows PolicyMode PolicyManagerMode // PlaceAzureChainFirst only affects Linux PlaceAzureChainFirst bool // MaxBatchedACLsPerPod is the maximum number of ACLs that can be added to a Pod at once in Windows. // The zero value is valid. // A NetworkPolicy's ACLs are always in the same batch, and there will be at least one NetworkPolicy per batch. MaxBatchedACLsPerPod int } type PolicyMap struct { sync.RWMutex cache map[string]*NPMNetworkPolicy } type reconcileManager struct { sync.Mutex releaseLockSignal chan struct{} } // PolicyManager has two locks. // The PolicyMap lock is used only in Windows to prevent concurrent write access to the PolicyMap // from both the NetPol Controller thread and the PodController thread, accessed respectively from // dataplane.AddPolicy()/dataplane.RemovePolicy(), and dataplane.ApplyDataplane() --> dataplane.updatePod(). // In Linux, the reconcileManager's lock is used to avoid iptables contention for adding/removing policies versus // background cleanup of stale, ineffective chains. type PolicyManager struct { policyMap *PolicyMap ioShim *common.IOShim staleChains *staleChains reconcileManager *reconcileManager *PolicyManagerCfg } func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManager { return &PolicyManager{ policyMap: &PolicyMap{ cache: make(map[string]*NPMNetworkPolicy), }, ioShim: ioShim, staleChains: newStaleChains(), reconcileManager: &reconcileManager{ releaseLockSignal: make(chan struct{}, 1), }, PolicyManagerCfg: cfg, } } func (pMgr *PolicyManager) ResetEndpoint(epID string) error { if util.IsWindowsDP() { return pMgr.bootup([]string{epID}) } return nil } func (pMgr *PolicyManager) Bootup(epIDs []string) error { metrics.ResetNumACLRules() if err := pMgr.bootup(epIDs); err != nil { // NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully metrics.SendErrorLogAndMetric(util.IptmID, "error: failed to bootup policy manager: %s", err.Error()) return npmerrors.ErrorWrapper(npmerrors.BootupPolicyMgr, false, "failed to bootup policy manager", err) } if !util.IsWindowsDP() { // update Prometheus metrics on success metrics.IncNumACLRulesBy(numLinuxBaseACLRules) } if util.IsWindowsDP() && pMgr.NodeIP == "" { return npmerrors.Errorf(npmerrors.BootupPolicyMgr, false, "policy manager must have a configured nodeIP in Windows") } return nil } func (pMgr *PolicyManager) Reconcile() { pMgr.reconcile() } func (pMgr *PolicyManager) PolicyExists(policyKey string) bool { pMgr.policyMap.RLock() defer pMgr.policyMap.RUnlock() _, ok := pMgr.policyMap.cache[policyKey] return ok } func (pMgr *PolicyManager) GetPolicy(policyKey string) (*NPMNetworkPolicy, bool) { pMgr.policyMap.RLock() defer pMgr.policyMap.RUnlock() policy, ok := pMgr.policyMap.cache[policyKey] return policy, ok } func (pMgr *PolicyManager) AddPolicies(policies []*NPMNetworkPolicy, endpointList map[string]string) error { nonEmptyPolicies := make([]*NPMNetworkPolicy, 0, len(policies)) for _, policy := range policies { if len(policy.ACLs) == 0 { klog.Infof("[DataPlane] No ACLs in policy %s to apply", policy.PolicyKey) continue } nonEmptyPolicies = append(nonEmptyPolicies, policy) NormalizePolicy(policy) if err := ValidatePolicy(policy); err != nil { msg := fmt.Sprintf("failed to validate policy: %s", err.Error()) metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg) return npmerrors.Errorf(npmerrors.AddPolicy, false, msg) } } if len(nonEmptyPolicies) == 0 { return nil } pMgr.policyMap.Lock() defer pMgr.policyMap.Unlock() // Call actual dataplane function to apply changes timer := metrics.StartNewTimer() err := pMgr.addPolicies(nonEmptyPolicies, endpointList) metrics.RecordACLRuleExecTime(timer) // record execution time regardless of failure if err != nil { // NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully // In Windows, Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully. msg := fmt.Sprintf("failed to add policy: %s", err.Error()) metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg) return npmerrors.Errorf(npmerrors.AddPolicy, false, msg) } for _, policy := range nonEmptyPolicies { // update Prometheus metrics on success if util.IsWindowsDP() { metrics.IncNumACLRulesBy((1 + policy.numACLRulesProducedInKernel()) * len(endpointList)) } else { metrics.IncNumACLRulesBy(policy.numACLRulesProducedInKernel()) } // add policy to cache pMgr.policyMap.cache[policy.PolicyKey] = policy } return nil } func (pMgr *PolicyManager) isFirstPolicy() bool { return len(pMgr.policyMap.cache) == 0 } func (pMgr *PolicyManager) RemovePolicy(policyKey string) error { policy, ok := pMgr.GetPolicy(policyKey) if !ok { return nil } if len(policy.ACLs) == 0 { klog.Infof("[DataPlane] No ACLs in policy %s to remove", policyKey) return nil } pMgr.policyMap.Lock() defer pMgr.policyMap.Unlock() // used for Prometheus metrics later numEndpointsBefore := len(policy.PodEndpoints) // Call actual dataplane function to apply changes err := pMgr.removePolicy(policy, nil) // currently we only have acl rule exec time for "adding" rules, so we skip recording here if err != nil { // NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully. // In Windows, Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully. msg := fmt.Sprintf("failed to remove policy: %s", err.Error()) metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg) return npmerrors.Errorf(npmerrors.RemovePolicy, false, msg) } // update Prometheus metrics on success if util.IsWindowsDP() { numEndpointsRemoved := numEndpointsBefore - len(policy.PodEndpoints) metrics.DecNumACLRulesBy((1 + policy.numACLRulesProducedInKernel()) * numEndpointsRemoved) } else { metrics.DecNumACLRulesBy(policy.numACLRulesProducedInKernel()) } // remove policy from cache delete(pMgr.policyMap.cache, policyKey) return nil } // RemovePolicyForEndpoints is identical to RemovePolicy except it will not remove the policy from the cache. // This function is intended for Windows only. func (pMgr *PolicyManager) RemovePolicyForEndpoints(policyKey string, endpointList map[string]string) error { policy, ok := pMgr.GetPolicy(policyKey) if !ok { return nil } if len(policy.ACLs) == 0 { klog.Infof("[DataPlane] No ACLs in policy %s to remove for endpoints", policyKey) return nil } // Call actual dataplane function to apply changes err := pMgr.removePolicy(policy, endpointList) // currently we only have acl rule exec time for "adding" rules, so we skip recording here if err != nil { // NOTE: Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully. msg := fmt.Sprintf("failed to remove policy. endpoints: [%+v]. err: [%s]", endpointList, err.Error()) metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg) return npmerrors.Errorf(npmerrors.RemovePolicy, false, msg) } // update Prometheus metrics on success metrics.DecNumACLRulesBy((1 + policy.numACLRulesProducedInKernel()) * len(endpointList)) return nil } func (pMgr *PolicyManager) isLastPolicy() bool { // if we change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument numPoliciesToDelete := 1 return len(pMgr.policyMap.cache) == numPoliciesToDelete }