npm/pkg/dataplane/ipsets/ipsetmanager.go (419 lines of code) (raw):
package ipsets
import (
"fmt"
"strings"
"sync"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/util"
npmerrors "github.com/Azure/azure-container-networking/npm/util/errors"
"k8s.io/klog"
)
type IPSetMode string
/*
IPSet Modes
- ApplyAllIPSets:
- all ipsets are added to the kernel
- ipsets are removed from the kernel when they are deleted from the cache
- creates empty ipsets
- adds empty/unreferenced ipsets to the toDelete cache periodically
- ApplyOnNeed:
- ipsets are added to the kernel when they are referenced by network policies or lists in the kernel
- ipsets are removed from the kernel when they no longer have a reference
- removes empty/unreferenced ipsets from the cache periodically
*/
const (
ApplyAllIPSets IPSetMode = "all"
ApplyOnNeed IPSetMode = "on-need"
)
var (
emptySetMetadata = &IPSetMetadata{
Name: "emptyhashset",
Type: EmptyHashSet,
}
emptySetPrefixName = emptySetMetadata.GetPrefixName()
)
type IPSetManager struct {
iMgrCfg *IPSetManagerCfg
// emptySet is a direct reference to the empty ipset that should always be in the kernel.
// This set is used based on the AddEmptySetToLists flag.
// If emptySet is non-nil, it should be in the kernel or ready to be created in the dirtyCache.
// Its reference counts are currently unaccounted for and may be incorrect.
emptySet *IPSet
setMap map[string]*IPSet
dirtyCache dirtyCacheInterface
ioShim *common.IOShim
// consecutiveApplyFailures is used in Linux to count the number of consecutive failures to apply ipsets
// if this count exceeds a threshold, we will panic
consecutiveApplyFailures int
sync.RWMutex
}
type IPSetManagerCfg struct {
IPSetMode IPSetMode
// NetworkName can be left empty or set to 'azure' or 'Calico' (case sensitive)
NetworkName string
// AddEmptySetToLists determines whether all lists should have an empty set as a member.
// This is necessary for HNS (Windows); otherwise, an allow ACL with a list condition
// allows all IPs if the list has no members.
AddEmptySetToLists bool
}
func NewIPSetManager(iMgrCfg *IPSetManagerCfg, ioShim *common.IOShim) *IPSetManager {
return &IPSetManager{
iMgrCfg: iMgrCfg,
emptySet: nil, // will be set if needed in calls to AddToLists
setMap: make(map[string]*IPSet),
dirtyCache: newDirtyCache(),
ioShim: ioShim,
// set to 0 to avoid lint error for windows
consecutiveApplyFailures: 0,
}
}
/*
Reconcile removes empty/unreferenced sets from the cache.
For ApplyAllIPSets mode, those sets are added to the toDeleteCache.
We can't delete from kernel immediately unless we lock iMgr during policy CRUD.
*/
func (iMgr *IPSetManager) Reconcile() {
iMgr.Lock()
defer iMgr.Unlock()
originalNumSets := len(iMgr.setMap)
for _, set := range iMgr.setMap {
iMgr.modifyCacheForCacheDeletion(set, util.SoftDelete)
}
numRemovedSets := originalNumSets - len(iMgr.setMap)
if numRemovedSets > 0 {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("[IPSetManager] removed %d empty/unreferenced ipsets, updating toDeleteCache to: %+v", numRemovedSets, iMgr.dirtyCache.printDeleteCache())
}
}
func (iMgr *IPSetManager) ResetIPSets() error {
iMgr.Lock()
defer iMgr.Unlock()
metrics.ResetNumIPSets()
metrics.ResetIPSetEntries()
err := iMgr.resetIPSets()
iMgr.setMap = make(map[string]*IPSet)
iMgr.emptySet = nil
iMgr.clearDirtyCache()
if err != nil {
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to reset ipsetmanager: %s", err.Error())
return fmt.Errorf("error while resetting ipsetmanager: %w", err)
}
return nil
}
func (iMgr *IPSetManager) CreateIPSets(setMetadatas []*IPSetMetadata) {
iMgr.Lock()
defer iMgr.Unlock()
for _, set := range setMetadatas {
_ = iMgr.createAndGetIPSet(set)
}
}
func (iMgr *IPSetManager) createAndGetIPSet(setMetadata *IPSetMetadata) *IPSet {
prefixedName := setMetadata.GetPrefixName()
set, exists := iMgr.setMap[prefixedName]
if exists {
return set
}
set = NewIPSet(setMetadata)
iMgr.setMap[prefixedName] = set
metrics.IncNumIPSets()
if iMgr.iMgrCfg.IPSetMode == ApplyAllIPSets {
iMgr.modifyCacheForKernelCreation(set)
}
// if configured, add the empty set to lists of type KeyLabelOfNamespace and KeyValueLabelOfNamespace.
// The NestedLabelOfPod list ipset type is assumed to always have a member (it is created specifically for network policy pod selectors).
if iMgr.iMgrCfg.AddEmptySetToLists && (set.Type == KeyLabelOfNamespace || set.Type == KeyValueLabelOfNamespace) {
if iMgr.emptySet == nil {
// duplicate of code chunk above
iMgr.emptySet = NewIPSet(emptySetMetadata)
iMgr.setMap[emptySetPrefixName] = iMgr.emptySet
metrics.IncNumIPSets()
iMgr.modifyCacheForKernelCreation(iMgr.emptySet)
}
iMgr.addMemberToList(set, iMgr.emptySet)
}
return set
}
// DeleteIPSet expects the prefixed ipset name
func (iMgr *IPSetManager) DeleteIPSet(name string, deleteOption util.DeleteOption) {
iMgr.Lock()
defer iMgr.Unlock()
set, exists := iMgr.setMap[name]
if !exists {
return
}
iMgr.modifyCacheForCacheDeletion(set, deleteOption)
}
// GetIPSet needs the prefixed ipset name
func (iMgr *IPSetManager) GetIPSet(name string) *IPSet {
iMgr.Lock()
defer iMgr.Unlock()
if !iMgr.exists(name) {
return nil
}
return iMgr.setMap[name]
}
// AddReference creates the set if necessary and adds relevant reference
// it throws an error if the set and reference type are an invalid combination
func (iMgr *IPSetManager) AddReference(setMetadata *IPSetMetadata, referenceName string, referenceType ReferenceType) error {
iMgr.Lock()
defer iMgr.Unlock()
// NOTE: any newly created IPSet will still be in the cache if an error is returned later
set := iMgr.createAndGetIPSet(setMetadata)
if referenceType == SelectorType && !set.canSetBeSelectorIPSet() {
msg := fmt.Sprintf("ipset %s is not a selector ipset it is of type %s", set.Name, set.Type.String())
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to add reference: %s", msg)
return npmerrors.Errorf(npmerrors.AddSelectorReference, false, msg)
}
wasInKernel := iMgr.shouldBeInKernel(set)
set.addReference(referenceName, referenceType)
if !wasInKernel {
// the set should be in the kernel, so add it to the kernel if it wasn't beforehand
// this branch can only be taken for ApplyOnNeed mode
iMgr.modifyCacheForKernelCreation(set)
// for ApplyAllIPSets mode, the set either:
// a) existed already and doesn't need to be added to toAddOrUpdateCache
// b) was created in createAndGetIPSet, where it was added to toAddOrUpdateCache
// if set.Kind == HashSet, then this for loop will do nothing
for _, member := range set.MemberIPSets {
iMgr.incKernelReferCountAndModifyCache(member)
}
}
return nil
}
// DeleteReference removes relevant reference
// it throws an error if the set doesn't exist (since a set should exist in the cache & kernel if it has a reference)
func (iMgr *IPSetManager) DeleteReference(setName, referenceName string, referenceType ReferenceType) error {
iMgr.Lock()
defer iMgr.Unlock()
if !iMgr.exists(setName) {
npmErrorString := npmerrors.DeleteSelectorReference
if referenceType == NetPolType {
npmErrorString = npmerrors.DeleteNetPolReference
}
msg := fmt.Sprintf("ipset %s does not exist", setName)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to delete reference: %s", msg)
return npmerrors.Errorf(npmErrorString, false, msg)
}
set := iMgr.setMap[setName]
wasInKernel := iMgr.shouldBeInKernel(set) // required because the set may not be in the kernel if this reference doesn't exist
set.deleteReference(referenceName, referenceType)
if wasInKernel && !iMgr.shouldBeInKernel(set) {
// remove from kernel if it was in the kernel before and shouldn't be now
// this branch can only be taken for ApplyOnNeed mode
iMgr.modifyCacheForKernelRemoval(set)
// for ApplyAllIPSets mode, we don't want to make the set dirty
// if set.Kind == HashSet, then this for loop will do nothing
for _, member := range set.MemberIPSets {
iMgr.decKernelReferCountAndModifyCache(member)
}
}
return nil
}
func (iMgr *IPSetManager) AddToSets(addToSets []*IPSetMetadata, ip, podKey string) error {
if len(addToSets) == 0 {
return nil
}
if !validateIPSetMemberIP(ip) {
msg := fmt.Sprintf("error: failed to add to sets: invalid ip %s", ip)
metrics.SendErrorLogAndMetric(util.IpsmID, msg)
return npmerrors.Errorf(npmerrors.AppendIPSet, true, msg)
}
iMgr.Lock()
defer iMgr.Unlock()
for _, metadata := range addToSets {
// 1. check for errors and create a missing set
prefixedName := metadata.GetPrefixName()
// NOTE: any newly created IPSet will still be in the cache if an error is returned later
set := iMgr.createAndGetIPSet(metadata)
if set.Kind != HashSet {
msg := fmt.Sprintf("ipset %s is not a hash set", prefixedName)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to add to sets: %s", msg)
return npmerrors.Errorf(npmerrors.AppendIPSet, false, msg)
}
// 2. add ip to the set, and update the pod key
_, ok := set.IPPodKey[ip]
if !ok {
iMgr.modifyCacheForKernelMemberAdd(set, ip)
metrics.AddEntryToIPSet(prefixedName)
}
set.IPPodKey[ip] = podKey
}
return nil
}
func (iMgr *IPSetManager) RemoveFromSets(removeFromSets []*IPSetMetadata, ip, podKey string) error {
if len(removeFromSets) == 0 {
return nil
}
if !validateIPSetMemberIP(ip) {
msg := fmt.Sprintf("error: failed to add to sets: invalid ip %s", ip)
metrics.SendErrorLogAndMetric(util.IpsmID, msg)
return npmerrors.Errorf(npmerrors.AppendIPSet, true, msg)
}
iMgr.Lock()
defer iMgr.Unlock()
// 1. check for errors (ignore missing sets)
for _, metadata := range removeFromSets {
prefixedName := metadata.GetPrefixName()
set, exists := iMgr.setMap[prefixedName]
if !exists {
continue
}
if set.Kind != HashSet {
msg := fmt.Sprintf("ipset %s is not a hash set", prefixedName)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to remove from sets: %s", msg)
return npmerrors.Errorf(npmerrors.DeleteIPSet, false, msg)
}
// 2. remove ip from the set
cachedPodKey, exists := set.IPPodKey[ip]
if !exists {
continue
}
// in case the IP belongs to a new Pod, then ignore this Delete call as this might be stale
if cachedPodKey != podKey {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof(
// "[IPSetManager] DeleteFromSet: PodOwner has changed for Ip: %s, setName:%s, Old podKey: %s, new podKey: %s. Ignore the delete as this is stale update",
// ip, prefixedName, cachedPodKey, podKey,
// )
continue
}
// update the IP ownership with podkey
iMgr.modifyCacheForKernelMemberDelete(set, ip)
delete(set.IPPodKey, ip)
metrics.RemoveEntryFromIPSet(prefixedName)
}
return nil
}
func (iMgr *IPSetManager) AddToLists(listMetadatas, setMetadatas []*IPSetMetadata) error {
if len(listMetadatas) == 0 || len(setMetadatas) == 0 {
return nil
}
iMgr.Lock()
defer iMgr.Unlock()
// 1. check for errors in members and create any missing sets
for _, setMetadata := range setMetadatas {
// NOTE: any newly created IPSet will still be in the cache if an error is returned later
set := iMgr.createAndGetIPSet(setMetadata)
// Nested IPSets are only supported for windows
// Check if we want to actually use that support
if set.Kind != HashSet {
msg := fmt.Sprintf("ipset %s is not a hash set and nested list sets are not supported", set.Name)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to add to lists: %s", msg)
return npmerrors.Errorf(npmerrors.AppendIPSet, false, msg)
}
}
for _, listMetadata := range listMetadatas {
// 2. create the list if it's missing and check for list errors
// NOTE: any newly created IPSet will still be in the cache if an error is returned later
list := iMgr.createAndGetIPSet(listMetadata)
if list.Kind != ListSet {
msg := fmt.Sprintf("ipset %s is not a list set", list.Name)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to add to lists: %s", msg)
return npmerrors.Errorf(npmerrors.AppendIPSet, false, msg)
}
// 3. add all members to the list
for _, memberMetadata := range setMetadatas {
memberName := memberMetadata.GetPrefixName()
if memberName == "" {
metrics.SendErrorLogAndMetric(util.IpsmID, "[AddToLists] warning: adding empty member name to list %s", list.Name)
continue
}
// the member shouldn't be the list itself, but this is satisfied since we already asserted that the member is a HashSet
if list.hasMember(memberName) {
continue
}
member := iMgr.setMap[memberName]
iMgr.addMemberToList(list, member)
listIsInKernel := iMgr.shouldBeInKernel(list)
if listIsInKernel {
iMgr.incKernelReferCountAndModifyCache(member)
}
}
}
return nil
}
func (iMgr *IPSetManager) addMemberToList(list, member *IPSet) {
iMgr.modifyCacheForKernelMemberAdd(list, member.HashedName)
list.MemberIPSets[member.Name] = member
member.incIPSetReferCount()
metrics.AddEntryToIPSet(list.Name)
}
func (iMgr *IPSetManager) RemoveFromList(listMetadata *IPSetMetadata, setMetadatas []*IPSetMetadata) error {
if len(setMetadatas) == 0 {
return nil
}
iMgr.Lock()
defer iMgr.Unlock()
// 1. check for errors (ignore missing sets)
listName := listMetadata.GetPrefixName()
list, exists := iMgr.setMap[listName]
if !exists {
return nil
}
if list.Kind != ListSet {
msg := fmt.Sprintf("ipset %s is not a list set", listName)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to remove from list: %s", msg)
return npmerrors.Errorf(npmerrors.DeleteIPSet, false, msg)
}
for _, setMetadata := range setMetadatas {
memberName := setMetadata.GetPrefixName()
if memberName == "" {
metrics.SendErrorLogAndMetric(util.IpsmID, "[RemoveFromList] warning: tried to remove empty member name from list %s", list.Name)
continue
}
if iMgr.iMgrCfg.AddEmptySetToLists && memberName == emptySetPrefixName {
metrics.SendErrorLogAndMetric(util.IpsmID, "[RemoveFromList] warning: tried to remove empty set from list %s", list.Name)
continue
}
member, exists := iMgr.setMap[memberName]
if !exists {
continue
}
// Nested IPSets are only supported for windows
// Check if we want to actually use that support
if member.Kind != HashSet {
msg := fmt.Sprintf("ipset %s is not a hash set and nested list sets are not supported", memberName)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to remove from list: %s", msg)
return npmerrors.Errorf(npmerrors.DeleteIPSet, false, msg)
}
// 2. remove member from the list
if !list.hasMember(memberName) {
continue
}
iMgr.modifyCacheForKernelMemberDelete(list, member.HashedName)
delete(list.MemberIPSets, memberName)
member.decIPSetReferCount()
metrics.RemoveEntryFromIPSet(list.Name)
listIsInKernel := iMgr.shouldBeInKernel(list)
if listIsInKernel {
iMgr.decKernelReferCountAndModifyCache(member)
}
}
return nil
}
func (iMgr *IPSetManager) ApplyIPSets() error {
iMgr.Lock()
defer iMgr.Unlock()
if iMgr.dirtyCache.numSetsToAddOrUpdate() == 0 && iMgr.dirtyCache.numSetsToDelete() == 0 {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Info("[IPSetManager] No IPSets to apply")
return nil
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof(
// "[IPSetManager] dirty caches. toAddUpdateCache: %s, toDeleteCache: %s",
// iMgr.dirtyCache.printAddOrUpdateCache(), iMgr.dirtyCache.printDeleteCache(),
// )
iMgr.sanitizeDirtyCache()
// Call the appropriate apply ipsets
prometheusTimer := metrics.StartNewTimer()
defer metrics.RecordIPSetExecTime(prometheusTimer) // record execution time regardless of failure
err := iMgr.applyIPSets()
if err != nil {
metrics.SendErrorLogAndMetric(util.IpsmID, "error: failed to apply ipsets: %s", err.Error())
return err
}
iMgr.clearDirtyCache()
// TODO could also set the number of ipsets in NPM (not necessarily in kernel) here using len(iMgr.setMap)
return nil
}
func (iMgr *IPSetManager) GetAllIPSets() map[string]string {
iMgr.RLock()
defer iMgr.RUnlock()
setMap := make(map[string]string, len(iMgr.setMap))
for _, metadata := range iMgr.setMap {
setMap[metadata.HashedName] = metadata.Name
}
return setMap
}
func (iMgr *IPSetManager) exists(name string) bool {
_, ok := iMgr.setMap[name]
return ok
}
// the metric for number of ipsets in the kernel will be lower than in reality until the next applyIPSet call
func (iMgr *IPSetManager) modifyCacheForCacheDeletion(set *IPSet, deleteOption util.DeleteOption) {
if set == iMgr.emptySet {
return
}
if deleteOption == util.ForceDelete {
// If force delete, then check if Set is used by other set or network policy
// else delete the set even if it has members
if !set.canBeForceDeleted() {
return
}
} else if !set.canBeDeleted(iMgr.emptySet) {
return
}
delete(iMgr.setMap, set.Name)
metrics.DeleteIPSet(set.Name)
if iMgr.iMgrCfg.IPSetMode == ApplyAllIPSets {
iMgr.modifyCacheForKernelRemoval(set)
}
// if mode is ApplyOnNeed, the set will not be in the kernel (or will be in the delete cache already) since there are no references
}
func (iMgr *IPSetManager) modifyCacheForKernelCreation(set *IPSet) {
iMgr.dirtyCache.create(set)
/*
TODO kernel-based prometheus metrics
metrics.IncNumKernelIPSets()
numEntries := len(set.MemberIPsets) OR len(set.IPPodKey)
metrics.SetNumEntriesForKernelIPSet(setName, numEntries)
*/
}
func (iMgr *IPSetManager) incKernelReferCountAndModifyCache(member *IPSet) {
wasInKernel := iMgr.shouldBeInKernel(member)
member.incKernelReferCount()
if !wasInKernel {
iMgr.modifyCacheForKernelCreation(member)
}
}
func (iMgr *IPSetManager) shouldBeInKernel(set *IPSet) bool {
return set.shouldBeInKernel() || iMgr.iMgrCfg.IPSetMode == ApplyAllIPSets || set == iMgr.emptySet
}
func (iMgr *IPSetManager) modifyCacheForKernelRemoval(set *IPSet) {
iMgr.dirtyCache.destroy(set)
/*
TODO kernel-based prometheus metrics
metrics.DecNumKernelIPSets()
numEntries := len(set.MemberIPsets) OR len(set.IPPodKey)
metrics.RemoveAllEntriesFromKernelIPSet(setName)
*/
}
func (iMgr *IPSetManager) decKernelReferCountAndModifyCache(member *IPSet) {
member.decKernelReferCount()
if !iMgr.shouldBeInKernel(member) {
iMgr.modifyCacheForKernelRemoval(member)
}
}
func (iMgr *IPSetManager) modifyCacheForKernelMemberAdd(set *IPSet, member string) {
if iMgr.shouldBeInKernel(set) {
iMgr.dirtyCache.addMember(set, member)
}
}
func (iMgr *IPSetManager) modifyCacheForKernelMemberDelete(set *IPSet, member string) {
if iMgr.shouldBeInKernel(set) {
iMgr.dirtyCache.deleteMember(set, member)
}
}
// sanitizeDirtyCache will check if any set marked as delete is in toAddUpdate
// if so will not delete it
func (iMgr *IPSetManager) sanitizeDirtyCache() {
anyProblems := false
for setName := range iMgr.dirtyCache.setsToDelete() {
if iMgr.dirtyCache.isSetToAddOrUpdate(setName) {
klog.Errorf("[IPSetManager] Unexpected state in dirty cache %s set is part of both update and delete caches", setName)
anyProblems = true
}
}
if anyProblems {
metrics.SendErrorLogAndMetric(util.IpsmID, "error: some dirty cache sets are part of both update and delete caches")
}
}
func (iMgr *IPSetManager) clearDirtyCache() {
iMgr.dirtyCache.reset()
}
// validateIPSetMemberIP helps valid if a member added to an HashSet has valid IP or CIDR
func validateIPSetMemberIP(ip string) bool {
// possible formats
// 192.168.0.1
// 192.168.0.1,tcp:25227
// 192.168.0.1 nomatch
// 192.168.0.0/24
// 192.168.0.0/24,tcp:25227
// 192.168.0.0/24 nomatch
// always guaranteed to have ip, not guaranteed to have port + protocol
ipDetails := strings.Split(ip, ",")
ipField := strings.Split(ipDetails[0], " ")
return util.IsIPV4(ipField[0])
}