npm/pkg/dataplane/dpshim/dpshim.go (514 lines of code) (raw):

package dpshim import ( "bytes" "errors" "fmt" "sync" "time" "github.com/Azure/azure-container-networking/npm/pkg/controlplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/pkg/protos" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "k8s.io/klog" ) const cleanEmptySetsInHrs = 24 var ErrChannelUnset = errors.New("channel must be set") // (TODO) DPShim has commonalities with IPSetManager, we should consider refactoring // to have a common interface for both. type DPShim struct { OutChannel chan *protos.Events stopChannel <-chan struct{} setCache map[string]*controlplane.ControllerIPSets policyCache map[string]*policies.NPMNetworkPolicy dirtyCache *dirtyCache mu *sync.Mutex } func NewDPSim(stopChannel <-chan struct{}) (*DPShim, error) { return &DPShim{ OutChannel: make(chan *protos.Events), setCache: make(map[string]*controlplane.ControllerIPSets), policyCache: make(map[string]*policies.NPMNetworkPolicy), stopChannel: stopChannel, dirtyCache: newDirtyCache(), mu: &sync.Mutex{}, }, nil } func (dp *DPShim) BootupDataplane() error { return nil } func (dp *DPShim) FinishBootupPhase() { // No-op } // HydrateClients is used in DPShim to hydrate a restarted Daemon Client func (dp *DPShim) HydrateClients() (*protos.Events, error) { dp.lock() defer dp.unlock() if len(dp.setCache) == 0 && len(dp.policyCache) == 0 { klog.Infof("HydrateClients: No local cache objects to hydrate daemon client") return nil, nil } goalStates := make(map[string]*protos.GoalState) toApplySets, err := dp.hydrateSetCache() if err != nil { return nil, err } if toApplySets != nil { goalStates[controlplane.IpsetApply] = toApplySets } toApplyPolicies, err := dp.hydratePolicyCache() if err != nil { return nil, err } if toApplyPolicies != nil { goalStates[controlplane.PolicyApply] = toApplyPolicies } if len(goalStates) == 0 { klog.Info("HydrateClients: No changes to apply") return nil, nil } return &protos.Events{ EventType: protos.Events_Hydration, Payload: goalStates, }, nil } func (dp *DPShim) RunPeriodicTasks() { // Here Run periodic task to check if any sets with empty references are present and delete them dp.deleteUnusedSets(dp.stopChannel) } // GetIPSet is a no-op in DPShim since DPShim does not deal with IPSet object func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { return nil } func (dp *DPShim) getCachedIPSet(setName string) *controlplane.ControllerIPSets { return dp.setCache[setName] } func (dp *DPShim) setExists(setName string) bool { _, ok := dp.setCache[setName] return ok } func (dp *DPShim) CreateIPSets(setMetadatas []*ipsets.IPSetMetadata) { dp.lock() defer dp.unlock() for _, set := range setMetadatas { dp.createIPSet(set) } } func (dp *DPShim) createIPSet(set *ipsets.IPSetMetadata) { setName := set.GetPrefixName() if dp.setExists(setName) { return } dp.setCache[setName] = controlplane.NewControllerIPSets(set) dp.dirtyCache.modifyAddorUpdateSets(setName) } func (dp *DPShim) DeleteIPSet(setMetadata *ipsets.IPSetMetadata, _ util.DeleteOption) { dp.lock() defer dp.unlock() dp.deleteIPSet(setMetadata) } func (dp *DPShim) deleteIPSet(setMetadata *ipsets.IPSetMetadata) { setName := setMetadata.GetPrefixName() klog.Infof("deleteIPSet: cleaning up %s", setName) set, ok := dp.setCache[setName] if !ok { return } if set.HasReferences() { klog.Infof("deleteIPSet: ignore delete since set: %s has references", setName) return } delete(dp.setCache, setName) dp.dirtyCache.modifyDeleteSets(setName) } func (dp *DPShim) AddToSets(setMetadatas []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { if len(setMetadatas) == 0 { return nil } dp.lock() defer dp.unlock() for _, set := range setMetadatas { klog.Infof("AddToSets: Adding pod IP: %s, Key: %s, to set %s", podMetadata.PodIP, podMetadata.PodKey, set.GetPrefixName()) prefixedSetName := set.GetPrefixName() if !dp.setExists(prefixedSetName) { dp.createIPSet(set) } set := dp.setCache[prefixedSetName] if set.IPSetMetadata.GetSetKind() != ipsets.HashSet { return npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("ipset %s is not a hash set", prefixedSetName)) } cachedPodMetadata, ok := set.IPPodMetadata[podMetadata.PodIP] if ok && cachedPodMetadata.PodKey == podMetadata.PodKey { continue } set.IPPodMetadata[podMetadata.PodIP] = podMetadata dp.dirtyCache.modifyAddorUpdateSets(prefixedSetName) } return nil } func (dp *DPShim) RemoveFromSets(setMetadatas []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { if len(setMetadatas) == 0 { return nil } dp.lock() defer dp.unlock() for _, set := range setMetadatas { klog.Infof("RemoveFromSets: removing pod ip: %s, podkey: %s, from set %s ", podMetadata.PodIP, podMetadata.PodKey, set.GetPrefixName()) prefixedSetName := set.GetPrefixName() if !dp.setExists(prefixedSetName) { continue } set := dp.setCache[prefixedSetName] if set.IPSetMetadata.GetSetKind() != ipsets.HashSet { return npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("RemoveFromSets, ipset %s is not a hash set", prefixedSetName)) } // in case the IP belongs to a new Pod, then ignore this Delete call as this might be stale cachedPod, exists := set.IPPodMetadata[podMetadata.PodIP] if !exists { continue } if cachedPod.PodKey != podMetadata.PodKey { klog.Infof("DeleteFromSet: PodOwner has changed for Ip: %s, setName:%s, Old podKey: %s, new podKey: %s. Ignore the delete as this is stale update", cachedPod.PodIP, prefixedSetName, cachedPod.PodKey, podMetadata.PodKey) continue } // update the IP ownership with podkey delete(set.IPPodMetadata, podMetadata.PodIP) dp.dirtyCache.modifyAddorUpdateSets(prefixedSetName) } return nil } func (dp *DPShim) AddToLists(listMetadatas, setMetadatas []*ipsets.IPSetMetadata) error { if len(listMetadatas) == 0 || len(setMetadatas) == 0 { return nil } dp.lock() defer dp.unlock() for _, setMetadata := range setMetadatas { setName := setMetadata.GetPrefixName() if !dp.setExists(setName) { dp.createIPSet(setMetadata) } set := dp.setCache[setName] if set.IPSetMetadata.GetSetKind() != ipsets.HashSet { return npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("ipset %s is not a hash set and nested list sets are not supported", setName)) } } for _, listMetadata := range listMetadatas { listName := listMetadata.GetPrefixName() if !dp.setExists(listName) { dp.createIPSet(listMetadata) } list := dp.setCache[listName] if list.IPSetMetadata.GetSetKind() != ipsets.ListSet { return npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("ipset %s is not a list set", listName)) } modified := false for _, setMetadata := range setMetadatas { setName := setMetadata.GetPrefixName() if _, ok := list.MemberIPSets[setName]; ok { continue } set := dp.setCache[setName] list.MemberIPSets[setName] = set.IPSetMetadata set.AddReference(listName, controlplane.ListReference) dp.dirtyCache.modifyAddorUpdateSets(setName) modified = true } if modified { dp.dirtyCache.modifyAddorUpdateSets(listName) } } return nil } func (dp *DPShim) RemoveFromList(listMetadata *ipsets.IPSetMetadata, setMetadatas []*ipsets.IPSetMetadata) error { if len(setMetadatas) == 0 || listMetadata == nil { return nil } dp.lock() defer dp.unlock() listName := listMetadata.GetPrefixName() list, exists := dp.setCache[listName] if !exists { return nil } if list.IPSetMetadata.GetSetKind() != ipsets.ListSet { return npmerrors.Errorf(npmerrors.DeleteIPSet, false, fmt.Sprintf("ipset %s is not a list set", listName)) } modified := false for _, setMetadata := range setMetadatas { setName := setMetadata.GetPrefixName() if !dp.setExists(setName) { continue } set := dp.setCache[setName] if set.IPSetMetadata.GetSetKind() != ipsets.HashSet { if modified { dp.dirtyCache.modifyAddorUpdateSets(listName) } return npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("ipset %s is not a hash set and nested list sets are not supported", setName)) } if _, ok := list.MemberIPSets[setName]; !ok { continue } delete(list.MemberIPSets, setName) set.DeleteReference(listName, controlplane.ListReference) modified = true } if modified { dp.dirtyCache.modifyAddorUpdateSets(listName) } return nil } func (dp *DPShim) AddPolicy(networkpolicies *policies.NPMNetworkPolicy) error { var err error // apply dataplane after syncing defer func() { dperr := dp.ApplyDataPlane() if dperr != nil { err = fmt.Errorf("failed with error %w, apply failed with %v", err, dperr) } }() // Here refers work in LIFO, DP gets unlocked first, then ApplyDataPlane will acquire lock again dp.lock() defer dp.unlock() if dp.policyExists(networkpolicies.PolicyKey) { return nil } policies.NormalizePolicy(networkpolicies) if vErr := policies.ValidatePolicy(networkpolicies); vErr != nil { return npmerrors.Errorf(npmerrors.AddPolicy, false, fmt.Sprintf("couldn't add malformed policy: %s", vErr.Error())) } dp.policyCache[networkpolicies.PolicyKey] = networkpolicies dp.dirtyCache.modifyAddorUpdatePolicies(networkpolicies.PolicyKey) return err } func (dp *DPShim) RemovePolicy(policyKey string) error { var err error // apply dataplane after syncing defer func() { dperr := dp.ApplyDataPlane() if dperr != nil { err = fmt.Errorf("failed with error %w, apply failed with %v", err, dperr) } }() // Here refers work in LIFO, DP gets unlocked first, then ApplyDataPlane will acquire lock again dp.lock() defer dp.unlock() // keeping err different so we can catch the defer func err delete(dp.policyCache, policyKey) dp.dirtyCache.modifyDeletePolicies(policyKey) return err } func (dp *DPShim) UpdatePolicy(networkpolicies *policies.NPMNetworkPolicy) error { var err error // apply dataplane after syncing defer func() { dperr := dp.ApplyDataPlane() if dperr != nil { err = fmt.Errorf("failed with error %w, apply failed with %v", err, dperr) } }() // Here refers work in LIFO, DP gets unlocked first, then ApplyDataPlane will acquire lock again dp.lock() defer dp.unlock() // For simplicity, we will not be adding references of netpols to ipsets. // DP in daemon will take care of tracking the references. dp.policyCache[networkpolicies.PolicyKey] = networkpolicies dp.dirtyCache.modifyAddorUpdatePolicies(networkpolicies.PolicyKey) return err } func (dp *DPShim) ApplyDataPlane() error { dp.lock() defer dp.unlock() // check dirty cache contents if !dp.dirtyCache.hasContents() { klog.Info("ApplyDataPlane: No changes to apply") return nil } dp.dirtyCache.printContents() goalStates := make(map[string]*protos.GoalState) toApplySets, err := dp.processIPSetsApply() if err != nil { return err } if toApplySets != nil { goalStates[controlplane.IpsetApply] = toApplySets } toDeleteSets, err := dp.processIPSetsDelete() if err != nil { return err } if toDeleteSets != nil { goalStates[controlplane.IpsetRemove] = toDeleteSets } toApplyPolicies, err := dp.processPoliciesApply() if err != nil { return err } if toApplyPolicies != nil { goalStates[controlplane.PolicyApply] = toApplyPolicies } toDeletePolicies, err := dp.processPoliciesRemove() if err != nil { return err } if toDeletePolicies != nil { goalStates[controlplane.PolicyRemove] = toDeletePolicies } if len(goalStates) == 0 { klog.Info("ApplyDataPlane: No changes to apply") return nil } go func() { dp.OutChannel <- &protos.Events{ EventType: protos.Events_GoalState, Payload: goalStates, } }() dp.dirtyCache.clearCache() return nil } func (dp *DPShim) GetAllIPSets() map[string]string { return nil } func (dp *DPShim) GetAllPolicies() []string { return nil } func (dp *DPShim) lock() { dp.mu.Lock() } func (dp *DPShim) unlock() { dp.mu.Unlock() } func (dp *DPShim) policyExists(policyKey string) bool { _, ok := dp.policyCache[policyKey] return ok } func (dp *DPShim) processIPSetsApply() (*protos.GoalState, error) { if len(dp.dirtyCache.toAddorUpdateSets) == 0 { return nil, nil } toApplySets := make([]*controlplane.ControllerIPSets, len(dp.dirtyCache.toAddorUpdateSets)) idx := 0 for setName := range dp.dirtyCache.toAddorUpdateSets { set := dp.getCachedIPSet(setName) if set == nil { klog.Errorf("processIPSetsApply: set %s not found", setName) return nil, npmerrors.Errorf(npmerrors.AppendIPSet, false, fmt.Sprintf("ipset %s not found", setName)) } toApplySets[idx] = set idx++ } payload, err := controlplane.EncodeControllerIPSets(toApplySets) if err != nil { klog.Errorf("processIPSetsApply: failed to encode sets %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.AppendIPSet, false, "processIPSetsApply: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) processIPSetsDelete() (*protos.GoalState, error) { if len(dp.dirtyCache.toDeleteSets) == 0 { return nil, nil } toDeleteSets := make([]string, len(dp.dirtyCache.toDeleteSets)) idx := 0 for setName := range dp.dirtyCache.toDeleteSets { toDeleteSets[idx] = setName idx++ } payload, err := controlplane.EncodeStrings(toDeleteSets) if err != nil { klog.Errorf("processIPSetsDelete: failed to encode sets %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.DeleteIPSet, false, "processIPSetsDelete: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) processPoliciesApply() (*protos.GoalState, error) { if len(dp.dirtyCache.toAddorUpdatePolicies) == 0 { return nil, nil } toApplyPolicies := make([]*policies.NPMNetworkPolicy, len(dp.dirtyCache.toAddorUpdatePolicies)) idx := 0 for policyKey := range dp.dirtyCache.toAddorUpdatePolicies { if !dp.policyExists(policyKey) { return nil, npmerrors.Errorf(npmerrors.AddPolicy, false, fmt.Sprintf("policy %s not found", policyKey)) } policy := dp.policyCache[policyKey] toApplyPolicies[idx] = policy idx++ } payload, err := controlplane.EncodeNPMNetworkPolicies(toApplyPolicies) if err != nil { klog.Errorf("processPoliciesApply: failed to encode policies %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.AddPolicy, false, "processPoliciesApply: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) processPoliciesRemove() (*protos.GoalState, error) { if len(dp.dirtyCache.toDeletePolicies) == 0 { return nil, nil } toDeletePolicies := make([]string, len(dp.dirtyCache.toDeletePolicies)) idx := 0 for policyKey := range dp.dirtyCache.toDeletePolicies { toDeletePolicies[idx] = policyKey idx++ } payload, err := controlplane.EncodeStrings(toDeletePolicies) if err != nil { klog.Errorf("processPoliciesRemove: failed to encode policies %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.RemovePolicy, false, "processPoliciesRemove: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) hydrateSetCache() (*protos.GoalState, error) { if len(dp.setCache) == 0 { return nil, nil } toApplySets := make([]*controlplane.ControllerIPSets, len(dp.setCache)) idx := 0 for _, set := range dp.setCache { toApplySets[idx] = set idx++ } payload, err := controlplane.EncodeControllerIPSets(toApplySets) if err != nil { klog.Errorf("processIPSetsApply: failed to encode sets %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.AppendIPSet, false, "processIPSetsApply: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) hydratePolicyCache() (*protos.GoalState, error) { if len(dp.policyCache) == 0 { return nil, nil } toApplyPolicies := make([]*policies.NPMNetworkPolicy, len(dp.policyCache)) idx := 0 for _, policy := range dp.policyCache { toApplyPolicies[idx] = policy idx++ } payload, err := controlplane.EncodeNPMNetworkPolicies(toApplyPolicies) if err != nil { klog.Errorf("processPoliciesApply: failed to encode policies %v", err) return nil, npmerrors.ErrorWrapper(npmerrors.AddPolicy, false, "processPoliciesApply: failed to encode sets", err) } return getGoalStateFromBuffer(payload), nil } func (dp *DPShim) deleteUnusedSets(stopChannel <-chan struct{}) { go func() { ticker := time.NewTicker(time.Hour * time.Duration(cleanEmptySetsInHrs)) defer ticker.Stop() for { select { case <-stopChannel: return case <-ticker.C: klog.Info("deleteUnusedSets: cleaning up unused sets") dp.checkSetReferences() err := dp.ApplyDataPlane() if err != nil { klog.Errorf("deleteUnusedSets: failed to apply dataplane %v", err) } } } }() } func (dp *DPShim) checkSetReferences() { for _, set := range dp.setCache { if !set.CanDelete() { continue } dp.deleteIPSet(set.IPSetMetadata) } } func getGoalStateFromBuffer(payload *bytes.Buffer) *protos.GoalState { return &protos.GoalState{ Data: payload.Bytes(), } }