npm/ipsm/ipsm.go (486 lines of code) (raw):

// Package ipsm focus on ip set operation // Copyright 2018 Microsoft. All rights reserved. // MIT License package ipsm import ( "encoding/json" "fmt" "os/exec" "regexp" "strings" "sync" "syscall" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/util" "github.com/pkg/errors" utilexec "k8s.io/utils/exec" ) // ReferCountOperation is used to indicate whether ipset refer count should be increased or decreased. type ReferCountOperation bool const ( IncrementOp ReferCountOperation = true DecrementOp ReferCountOperation = false ) type ipsEntry struct { operationFlag string name string set string spec []string } // IpsetManager stores ipset states. // Hold lock only exposed methods are called to avoid race condition from all controllers type IpsetManager struct { exec utilexec.Interface listMap map[string]*Ipset // tracks all set lists. setMap map[string]*Ipset // label -> []ip sync.RWMutex } // Ipset represents one ipset entry. type Ipset struct { name string elements map[string]string // key = ip, value: context associated to the ip like podKey referCount int } func (ipset *Ipset) incReferCount() { ipset.referCount++ } func (ipset *Ipset) decReferCount() { ipset.referCount-- } // NewIpset creates a new instance for Ipset object. func newIpset(setName string) *Ipset { return &Ipset{ name: setName, elements: make(map[string]string), referCount: 0, } } // NewIpsetManager creates a new instance for IpsetManager object. func NewIpsetManager(exec utilexec.Interface) *IpsetManager { return &IpsetManager{ exec: exec, listMap: make(map[string]*Ipset), setMap: make(map[string]*Ipset), } } func (ipsMgr *IpsetManager) GetListMapRaw() ([]byte, error) { ipsMgr.RLock() defer ipsMgr.RUnlock() listMap := make(map[string]string, len(ipsMgr.listMap)) for k := range ipsMgr.setMap { hashedName := util.GetHashedName(k) listMap[hashedName] = k } b, err := json.Marshal(listMap) if err != nil { return nil, errors.Wrapf(err, "failed to marshal list map") } return b, nil } func (ipsMgr *IpsetManager) GetSetMapRaw() ([]byte, error) { ipsMgr.RLock() defer ipsMgr.RUnlock() setMap := make(map[string]string, len(ipsMgr.setMap)) for k := range ipsMgr.setMap { hashedName := util.GetHashedName(k) setMap[hashedName] = k } b, err := json.Marshal(setMap) if err != nil { return nil, errors.Wrapf(err, "failed to marshal list map") } return b, nil } // Exists checks if an element exists in setMap/listMap. func (ipsMgr *IpsetManager) exists(listName string, setName string, kind string) bool { m := ipsMgr.setMap if kind == util.IpsetSetListFlag { m = ipsMgr.listMap } if _, exists := m[listName]; !exists { return false } if _, exists := m[listName].elements[setName]; !exists { return false } return true } // IpSetReferIncOrDec checks if an element exists in setMap/listMap and then increases or decreases this referCount. func (ipsMgr *IpsetManager) IpSetReferIncOrDec(ipsetName string, kind string, countOperation ReferCountOperation) { m := ipsMgr.setMap if kind == util.IpsetSetListFlag { m = ipsMgr.listMap } switch countOperation { case IncrementOp: m[ipsetName].incReferCount() case DecrementOp: m[ipsetName].decReferCount() } } // SetExists checks if an ipset exists, and returns the type func (ipsMgr *IpsetManager) setExists(setName string) (bool, string) { _, exists := ipsMgr.setMap[setName] if exists { return exists, util.IpsetSetGenericFlag } _, exists = ipsMgr.listMap[setName] if exists { return exists, util.IpsetSetListFlag } return exists, "" } func isNsSet(setName string) bool { return !strings.Contains(setName, "-") && !strings.Contains(setName, ":") } // DeleteList removes an ipset list. func (ipsMgr *IpsetManager) DeleteList(listName string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.deleteList(listName) } // DeleteList removes an ipset list. func (ipsMgr *IpsetManager) deleteList(listName string) error { entry := &ipsEntry{ operationFlag: util.IpsetDestroyFlag, set: util.GetHashedName(listName), } if ipsMgr.listMap[listName].referCount > 0 { ipsMgr.IpSetReferIncOrDec(listName, util.IpsetSetListFlag, DecrementOp) return nil } if errCode, err := ipsMgr.run(entry); err != nil { if errCode == 1 { return nil } metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to delete ipset %s %+v", listName, entry) return err } delete(ipsMgr.listMap, listName) metrics.DeleteIPSet(listName) return nil } // Run execute an ipset command to update ipset. func (ipsMgr *IpsetManager) run(entry *ipsEntry) (int, error) { cmdName := util.Ipset cmdArgs := append([]string{entry.operationFlag, util.IpsetExistFlag, entry.set}, entry.spec...) cmdArgs = util.DropEmptyFields(cmdArgs) log.Logf("Executing ipset command %s %v", cmdName, cmdArgs) cmd := ipsMgr.exec.Command(cmdName, cmdArgs...) output, err := cmd.CombinedOutput() if result, isExitError := err.(utilexec.ExitError); isExitError { exitCode := result.ExitStatus() errfmt := fmt.Errorf("error running command: [%s %v] Stderr: [%w, %s]", cmdName, strings.Join(cmdArgs, " "), err, strings.TrimSuffix(string(output), "\n")) if exitCode > 0 { metrics.SendErrorLogAndMetric(util.IpsmID, errfmt.Error()) } return exitCode, errfmt } return 0, nil } // CreateListNoLock is identical to CreateList except it does not lock the ipsMgr. func (ipsMgr *IpsetManager) CreateListNoLock(listName string) error { if _, exists := ipsMgr.listMap[listName]; exists { return nil } entry := &ipsEntry{ name: listName, operationFlag: util.IpsetCreationFlag, set: util.GetHashedName(listName), spec: []string{util.IpsetSetListFlag}, } log.Logf("Creating List: %+v", entry) timer := metrics.StartNewTimer() errCode, err := ipsMgr.run(entry) metrics.RecordIPSetExecTime(timer) // record execution time regardless of failure if err != nil && errCode != 1 { metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to create ipset list %s.", listName) return err } if err == nil { metrics.IncNumIPSets() } ipsMgr.listMap[listName] = newIpset(listName) return nil } // CreateSetNoLock is identical to CreateSet except it does not lock the ipsMgr. func (ipsMgr *IpsetManager) CreateSetNoLock(setName string, spec []string) error { // This timer measures execution time to run this function regardless of success or failure cases prometheusTimer := metrics.StartNewTimer() if _, exists := ipsMgr.setMap[setName]; exists { return nil } defer metrics.RecordIPSetExecTime(prometheusTimer) entry := &ipsEntry{ name: setName, operationFlag: util.IpsetCreationFlag, // Use hashed string for set name to avoid string length limit of ipset. set: util.GetHashedName(setName), spec: spec, } log.Logf("Creating Set: %+v", entry) // (TODO): need to differentiate errCode handler // since errCode can be one in case of "set with the same name already exists" // and "maximal number of sets reached, cannot create more." // It may have more situations with errCode==1. errCode, err := ipsMgr.run(entry) if err != nil && errCode != 1 { metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to create ipset.") return err } if err == nil { metrics.IncNumIPSets() } ipsMgr.setMap[setName] = newIpset(setName) return nil } func (ipsMgr *IpsetManager) deleteSet(setName string) error { if _, exists := ipsMgr.setMap[setName]; !exists { metrics.SendErrorLogAndMetric(util.IpsmID, "ipset with name %s not found", setName) return nil } entry := &ipsEntry{ operationFlag: util.IpsetDestroyFlag, set: util.GetHashedName(setName), } if errCode, err := ipsMgr.run(entry); err != nil { if errCode == 1 { return nil } metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to delete ipset %s. Entry: %+v", setName, entry) return err } delete(ipsMgr.setMap, setName) metrics.DeleteIPSet(setName) return nil } // CreateList creates an ipset list. npm maintains one setlist per namespace label. func (ipsMgr *IpsetManager) CreateList(listName string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.CreateListNoLock(listName) } // AddToList inserts an ipset to an ipset list. func (ipsMgr *IpsetManager) AddToList(listName string, setName string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.AddToListNoLock(listName, setName) } // AddToListNoLock is identical to AddToList except it does not lock the ipsMgr. func (ipsMgr *IpsetManager) AddToListNoLock(listName, setName string) error { if listName == setName { return nil } // Check if list being added exists in the listMap, if it exists we don't care about the set type exists, _ := ipsMgr.setExists(setName) // if set does not exist, then return because the ipset call will fail due to set not existing if !exists { return fmt.Errorf("Set [%s] does not exist when attempting to add to list [%s]", setName, listName) } // Check if the list that is being added to exists exists, listtype := ipsMgr.setExists(listName) // Make sure that set returned is of list type, otherwise return because we can't add a set to a non setlist type if exists && listtype != util.IpsetSetListFlag { return fmt.Errorf("Failed to add set [%s] to list [%s], but list is of type [%s]", setName, listName, listtype) } else if !exists { // if the list doesn't exist, create it if err := ipsMgr.CreateListNoLock(listName); err != nil { return err } } // check if set already exists in the list if ipsMgr.exists(listName, setName, util.IpsetSetListFlag) { return nil } entry := &ipsEntry{ operationFlag: util.IpsetAppendFlag, set: util.GetHashedName(listName), spec: []string{util.GetHashedName(setName)}, } // add set to list errCode, err := ipsMgr.run(entry) if err != nil && errCode != 1 { return fmt.Errorf("Error: failed to create ipset rules. rule: %+v, error: %v", entry, err) } if err == nil { metrics.AddEntryToIPSet(listName) } ipsMgr.listMap[listName].elements[setName] = "" return nil } // DeleteFromList removes an ipset to an ipset list. func (ipsMgr *IpsetManager) DeleteFromList(listName string, setName string) error { ipsMgr.Lock() defer ipsMgr.Unlock() // Check if list being added exists in the listMap, if it exists we don't care about the set type exists, _ := ipsMgr.setExists(setName) // if set does not exist, then return because the ipset call will fail due to set not existing // TODO make sure these are info and not errors, use NPmErr if !exists { metrics.SendErrorLogAndMetric(util.IpsmID, "Set [%s] does not exist when attempting to delete from list [%s]", setName, listName) return nil } // Check if list being added exists in the listMap, if it exists we don't care about the set type exists, listtype := ipsMgr.setExists(listName) // if set does not exist, then return because the ipset call will fail due to set not existing if !exists { metrics.SendErrorLogAndMetric(util.IpsmID, "Set [%s] does not exist when attempting to add to list [%s]", setName, listName) return nil } if listtype != util.IpsetSetListFlag { metrics.SendErrorLogAndMetric(util.IpsmID, "Set [%s] is of the wrong type when attempting to delete list [%s], actual type [%s]", setName, listName, listtype) return nil } if _, exists := ipsMgr.listMap[listName]; !exists { metrics.SendErrorLogAndMetric(util.IpsmID, "ipset list with name %s not found", listName) return nil } hashedListName, hashedSetName := util.GetHashedName(listName), util.GetHashedName(setName) entry := &ipsEntry{ operationFlag: util.IpsetDeletionFlag, set: hashedListName, spec: []string{hashedSetName}, } if _, err := ipsMgr.run(entry); err != nil { metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to delete ipset entry. %+v", entry) return err } // Now cleanup the cache. Do nothing if the specified key doesn't exist. delete(ipsMgr.listMap[listName].elements, setName) metrics.RemoveEntryFromIPSet(listName) if len(ipsMgr.listMap[listName].elements) == 0 { if err := ipsMgr.deleteList(listName); err != nil { metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to delete ipset list %s.", listName) return err } } return nil } // CreateSet creates an ipset. func (ipsMgr *IpsetManager) CreateSet(setName string, spec []string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.CreateSetNoLock(setName, spec) } // DeleteSet removes a set from ipset. func (ipsMgr *IpsetManager) DeleteSet(setName string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.deleteSet(setName) } // AddToSet inserts an ip to an entry in setMap, and creates/updates the corresponding ipset. func (ipsMgr *IpsetManager) AddToSet(setName, ip, spec, podKey string) error { ipsMgr.Lock() defer ipsMgr.Unlock() return ipsMgr.AddToSetNoLock(setName, ip, spec, podKey) } // AddToSetNoLock is identical to AddToSet except it does not lock the ipsMgr. func (ipsMgr *IpsetManager) AddToSetNoLock(setName, ip, spec, podKey string) error { if ipsMgr.exists(setName, ip, spec) { // make sure we have updated the podKey in case it gets changed cachedPodKey := ipsMgr.setMap[setName].elements[ip] if cachedPodKey != podKey { log.Logf("AddToSet: PodOwner has changed for Ip: %s, setName:%s, Old podKey: %s, new podKey: %s. Replace context with new PodOwner.", ip, setName, cachedPodKey, podKey) ipsMgr.setMap[setName].elements[ip] = podKey } return nil } // possible formats // 192.168.0.1 // 192.168.0.1,tcp:25227 // todo: handle ip and port with protocol, plus just ip // always guaranteed to have ip, not guaranteed to have port + protocol ipDetails := strings.Split(ip, ",") if len(ipDetails) > 0 && ipDetails[0] == "" { return fmt.Errorf("Failed to add IP to set [%s], the ip to be added was empty, spec: %+v", setName, spec) } // check if the set exists, ignore the type of the set being added if it exists since the only purpose is to see if it's created or not exists, _ := ipsMgr.setExists(setName) if !exists { if err := ipsMgr.CreateSetNoLock(setName, []string{spec}); err != nil { return err } } var resultSpec []string if strings.Contains(ip, util.IpsetNomatch) { ip = strings.Trim(ip, util.IpsetNomatch) resultSpec = []string{ip, util.IpsetNomatch} } else { resultSpec = []string{ip} } entry := &ipsEntry{ operationFlag: util.IpsetAppendFlag, set: util.GetHashedName(setName), spec: resultSpec, } // todo: check err handling besides error code, corrupt state possible here errCode, err := ipsMgr.run(entry) if err != nil && errCode != 1 { metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to create ipset rules. %+v", entry) return err } if err == nil { metrics.AddEntryToIPSet(setName) } // Stores the podKey as the context for this ip. ipsMgr.setMap[setName].elements[ip] = podKey return nil } // DeleteFromSet removes an ip from an entry in setMap, and delete/update the corresponding ipset. func (ipsMgr *IpsetManager) DeleteFromSet(setName, ip, podKey string) error { ipsMgr.Lock() defer ipsMgr.Unlock() ipSet, exists := ipsMgr.setMap[setName] if !exists { log.Logf("ipset with name %s not found", setName) return nil } // possible formats // 192.168.0.1 // 192.168.0.1,tcp:25227 // todo: handle ip and port with protocol, plus just ip // always guaranteed to have ip, not guaranteed to have port + protocol ipDetails := strings.Split(ip, ",") if len(ipDetails) > 0 && ipDetails[0] == "" { return fmt.Errorf("Failed to add IP to set [%s], the ip to be added was empty", setName) } if _, exists := ipsMgr.setMap[setName].elements[ip]; exists { // in case the IP belongs to a new Pod, then ignore this Delete call as this might be stale cachedPodKey := ipSet.elements[ip] if cachedPodKey != podKey { log.Logf("DeleteFromSet: PodOwner has changed for Ip: %s, setName:%s, Old podKey: %s, new podKey: %s. Ignore the delete as this is stale update", ip, setName, cachedPodKey, podKey) return nil } } // TODO optimize to not run this command in case cache has already been updated. entry := &ipsEntry{ operationFlag: util.IpsetDeletionFlag, set: util.GetHashedName(setName), spec: []string{ip}, } if errCode, err := ipsMgr.run(entry); err != nil { if errCode == 1 { return nil } metrics.SendErrorLogAndMetric(util.IpsmID, "Error: failed to delete ipset entry: [%+v] err: [%v]", entry, err) return err } // Now cleanup the cache delete(ipsMgr.setMap[setName].elements, ip) metrics.RemoveEntryFromIPSet(setName) if len(ipsMgr.setMap[setName].elements) == 0 { if err := ipsMgr.deleteSet(setName); err != nil { return err } } return nil } // DestroyNpmIpsets destroys only ipsets created by NPM func (ipsMgr *IpsetManager) DestroyNpmIpsets() error { log.Logf("Azure-NPM creating, cleaning existing Azure NPM IPSets") ipsMgr.Lock() defer ipsMgr.Unlock() cmdName := util.Ipset cmdArgs := util.IPsetCheckListFlag reply, err := ipsMgr.exec.Command(cmdName, cmdArgs).CombinedOutput() if msg, failed := err.(*exec.ExitError); failed { errCode := msg.Sys().(syscall.WaitStatus).ExitStatus() if errCode > 0 { metrics.SendErrorLogAndMetric(util.IpsmID, "{DestroyNpmIpsets} Error: There was an error running command: [%s] Stderr: [%v, %s]", cmdName, err, strings.TrimSuffix(string(msg.Stderr), "\n")) } return err } if len(reply) == 0 { // this would occur if there were ever 0 ipsets metrics.SendErrorLogAndMetric(util.IpsmID, "{DestroyNpmIpsets} Received empty string from ipset list while destroying azure-npm ipsets") return nil } re := regexp.MustCompile("Name: (" + util.AzureNpmPrefix + "\\d+)") ipsetRegexSlice := re.FindAllSubmatch(reply, -1) if len(ipsetRegexSlice) == 0 { log.Logf("No Azure-NPM IPsets are found in the Node.") return nil } ipsetLists := make([]string, 0) for _, matchedItem := range ipsetRegexSlice { if len(matchedItem) == 2 { itemString := string(matchedItem[1]) if strings.Contains(itemString, util.AzureNpmFlag) { ipsetLists = append(ipsetLists, itemString) } } } if len(ipsetLists) == 0 { return nil } destroyFailureCount := 0 for _, ipsetName := range ipsetLists { flushEntry := &ipsEntry{ operationFlag: util.IpsetFlushFlag, set: ipsetName, } _, err := ipsMgr.run(flushEntry) if err != nil { metrics.SendErrorLogAndMetric(util.IpsmID, "{DestroyNpmIpsets} Error: failed to flush ipset %s", ipsetName) } else { metrics.RemoveAllEntriesFromIPSet(ipsetName) } } for _, ipsetName := range ipsetLists { deleteEntry := &ipsEntry{ operationFlag: util.IpsetDestroyFlag, set: ipsetName, } _, err := ipsMgr.run(deleteEntry) if err != nil { destroyFailureCount++ metrics.SendErrorLogAndMetric(util.IpsmID, "{DestroyNpmIpsets} Error: failed to destroy ipset %s", ipsetName) } } // After this function, NumIPSets should be 0 or the number of NPM IPSets that existed and failed to be destroyed. // When NPM restarts, Prometheus metrics will initialize at 0, but NPM IPSets may exist. if metrics.NumIPSetsIsPositive() { // in this case, we should have originalNumIPSets == len(ipsetLists) metrics.SetNumIPSets(destroyFailureCount) } else { metrics.ResetNumIPSets() } // NOTE: in v2, we reset metrics blindly, regardless of errors // So v2 would underestimate the number of ipsets/entries if there are destroy failures. // In v1 we remove entries for ipsets we flush. // We may miss removing entries if some sets are in the prometheus metric but not in the kernel. return nil }