npm/pkg/controlplane/controllers/v2/podController.go (426 lines of code) (raw):
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package controllers
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"time"
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/controlplane/controllers/common"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets"
"github.com/Azure/azure-container-networking/npm/util"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformer "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
// NamedPortOperation decides opeartion (e.g., delete or add) for named port ipset in manageNamedPortIpsets
type NamedPortOperation string
const (
deleteNamedPort NamedPortOperation = "del"
addNamedPort NamedPortOperation = "add"
addEvent string = "ADD"
updateEvent string = "UPDATE"
)
var kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace}
type PodController struct {
podLister corelisters.PodLister
workqueue workqueue.RateLimitingInterface
dp dataplane.GenericDataplane
podMap map[string]*common.NpmPod // Key is <nsname>/<podname>
sync.RWMutex
npmNamespaceCache *NpmNamespaceCache
}
func NewPodController(podInformer coreinformer.PodInformer, dp dataplane.GenericDataplane, npmNamespaceCache *NpmNamespaceCache) *PodController {
podController := &PodController{
podLister: podInformer.Lister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
dp: dp,
podMap: make(map[string]*common.NpmPod),
npmNamespaceCache: npmNamespaceCache,
}
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: podController.addPod,
UpdateFunc: podController.updatePod,
DeleteFunc: podController.deletePod,
},
)
return podController
}
func (c *PodController) MarshalJSON() ([]byte, error) {
c.Lock()
defer c.Unlock()
podMapRaw, err := json.Marshal(c.podMap)
if err != nil {
return nil, errors.Errorf("failed to marshal podMap due to %v", err)
}
return podMapRaw, nil
}
func (c *PodController) LengthOfPodMap() int {
return len(c.podMap)
}
// needSync filters the event if the event is not required to handle
func (c *PodController) needSync(eventType string, obj interface{}) (string, bool) {
needSync := false
var key string
podObj, ok := obj.(*corev1.Pod)
if !ok {
metrics.SendErrorLogAndMetric(util.PodID, "ADD Pod: Received unexpected object type: %v", obj)
return key, needSync
}
// should enqueue updates for Pods with an empty IP if they are also Running
if !hasValidPodIP(podObj) && (eventType == addEvent || podObj.Status.Phase != corev1.PodRunning) {
return key, needSync
}
if isHostNetworkPod(podObj) {
return key, needSync
}
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
metrics.SendErrorLogAndMetric(util.PodID, "[POD %s EVENT] Error: podKey is empty for %s pod in %s with UID %s",
eventType, podObj.Name, util.GetNSNameWithPrefix(podObj.Namespace), podObj.UID)
return key, needSync
}
needSync = true
return key, needSync
}
func (c *PodController) addPod(obj interface{}) {
key, needSync := c.needSync(addEvent, obj)
if !needSync {
return
}
podObj, _ := obj.(*corev1.Pod)
// To check whether this pod is needed to queue or not.
// If the pod are in completely terminated states, the pod is not enqueued to avoid unnecessary computation.
if isCompletePod(podObj) {
return
}
c.workqueue.Add(key)
}
func (c *PodController) updatePod(old, newp interface{}) {
key, needSync := c.needSync(updateEvent, newp)
if !needSync {
return
}
// needSync checked validation of casting newPod.
newPod, _ := newp.(*corev1.Pod)
oldPod, ok := old.(*corev1.Pod)
if ok {
if oldPod.ResourceVersion == newPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pods will always have different RVs.
return
}
}
c.workqueue.Add(key)
}
func (c *PodController) deletePod(obj interface{}) {
podObj, ok := obj.(*corev1.Pod)
// DeleteFunc gets the final state of the resource (if it is known).
// Otherwise, it gets an object of type DeletedFinalStateUnknown.
// This can happen if the watch is closed and misses the delete event and
// the controller doesn't notice the deletion until the subsequent re-list
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
metrics.SendErrorLogAndMetric(util.PodID, "[POD DELETE EVENT] Pod: Received unexpected object type: %v", obj)
return
}
if podObj, ok = tombstone.Obj.(*corev1.Pod); !ok {
metrics.SendErrorLogAndMetric(util.PodID, "[POD DELETE EVENT] Pod: Received unexpected object type (error decoding object tombstone, invalid type): %v", obj)
return
}
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("[POD DELETE EVENT] for %s in %s", podObj.Name, podObj.Namespace)
if isHostNetworkPod(podObj) {
return
}
var err error
var key string
if key, err = cache.MetaNamespaceKeyFunc(podObj); err != nil {
utilruntime.HandleError(err)
metrics.SendErrorLogAndMetric(util.PodID, "[POD DELETE EVENT] Error: podKey is empty for %s pod in %s with UID %s",
podObj.ObjectMeta.Name, util.GetNSNameWithPrefix(podObj.Namespace), podObj.UID)
return
}
c.workqueue.Add(key)
}
func (c *PodController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Starting Pod worker")
go wait.Until(c.runWorker, time.Second, stopCh)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Info("Started Pod workers")
<-stopCh
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Info("Shutting down Pod workers")
}
func (c *PodController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *PodController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue got %#v, err %w", obj, errWorkqueueFormatting))
return nil
}
// Run the syncPod, passing it the namespace/name string of the
// Pod resource to be synced.
if err := c.syncPod(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
metrics.SendErrorLogAndMetric(util.PodID, "[podController processNextWorkItem] Error: failed to syncPod %s. Requeuing with err: %v", key, err)
return fmt.Errorf("error syncing '%s': %w, requeuing", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncPod compares the actual state with the desired, and attempts to converge the two.
func (c *PodController) syncPod(key string) error {
// timer for recording execution times
timer := metrics.StartNewTimer()
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to split meta namespace key %s with err %w", key, err))
return nil //nolint HandleError is used instead of returning error to caller
}
// Get the Pod resource with this namespace/name
pod, err := c.podLister.Pods(namespace).Get(name)
// apply dataplane and record exec time after syncing
operationKind := metrics.NoOp
defer func() {
if err != nil {
klog.Infof("[syncPod] failed to sync pod, but will apply any changes to the dataplane. err: %s", err.Error())
}
dperr := c.dp.ApplyDataPlane()
// can't record this in another deferred func since deferred funcs are processed in LIFO order
metrics.RecordControllerPodExecTime(timer, operationKind, err != nil && dperr != nil)
if dperr != nil {
klog.Errorf("failed to apply dataplane changes while syncing pod. err: %s", dperr.Error())
metrics.SendErrorLogAndMetric(util.PodID, "[syncPod] failed to apply dataplane changes while syncing pod. err: %s", dperr.Error())
// Seems like setting err below does nothing.
// The return value of syncPod is fixed before this deferred func is called,
// so modifications to err here do nothing.
// As a result, the controller will not requeue if there is an error applying the dataplane.
// However, a subsequent controller event should Apply Dataplane soon after.
if err == nil {
err = fmt.Errorf("failed to apply dataplane changes while syncing pod. err: %w", dperr)
} else {
err = fmt.Errorf("failed to sync pod and apply dataplane changes. sync err: [%w], apply err: [%v]", err, dperr)
}
}
}()
c.Lock()
defer c.Unlock()
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("pod %s not found, may be it is deleted", key)
if _, ok := c.podMap[key]; ok {
// record time to delete pod if it exists (can't call within cleanUpDeletedPod because this can be called by a pod update)
operationKind = metrics.DeleteOp
}
// cleanUpDeletedPod will check if the pod exists in cache, if it does then proceeds with deletion
// if it does not exists, then event will be no-op
err = c.cleanUpDeletedPod(key)
if err != nil {
// need to retry this cleaning-up process
return fmt.Errorf("error: %w when pod is not found", err)
}
return err
}
return err
}
// If this pod is completely in terminated states (which means pod is gracefully shutdown),
// NPM starts clean-up the lastly applied states even in update events.
// This proactive clean-up helps to miss stale pod object in case delete event is missed.
if isCompletePod(pod) {
if _, ok := c.podMap[key]; ok {
// record time to delete pod if it exists (can't call within cleanUpDeletedPod because this can be called by a pod update)
operationKind = metrics.DeleteOp
}
if err = c.cleanUpDeletedPod(key); err != nil {
return fmt.Errorf("error: %w when when pod is in completed state", err)
}
return nil
}
cachedNpmPod, npmPodExists := c.podMap[key]
if npmPodExists {
// if pod does not have different states against lastly applied states stored in cachedNpmPod,
// podController does not need to reconcile this update.
// in this updatePod event, newPod was updated with states which PodController does not need to reconcile.
if cachedNpmPod.NoUpdate(pod) {
return nil
}
}
operationKind, err = c.syncAddAndUpdatePod(pod)
if err != nil {
return fmt.Errorf("failed to sync pod due to %w", err)
}
return nil
}
func (c *PodController) syncAddedPod(podObj *corev1.Pod) error {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("POD CREATING: [%s/%s/%s/%s/%+v/%s]", string(podObj.GetUID()), podObj.Namespace,
// podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP)
if !util.IsIPV4(podObj.Status.PodIP) {
msg := fmt.Sprintf("[syncAddedPod] warning: ADD POD [%s/%s/%s/%+v] ignored as the PodIP is not valid ipv4 address. ip: [%s]", podObj.Namespace,
podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP)
metrics.SendLog(util.PodID, msg, metrics.PrintLog)
// return nil so that we don't requeue.
// Wait until an update event comes from API Server where the IP is valid e.g. if the IP is empty.
// There may be latency in receiving the update event versus retrying on our own,
// but this prevents us from retrying indefinitely for pods stuck in Running state with no IP as seen in AKS Windows Server '22.
return nil
}
var err error
podKey, _ := cache.MetaNamespaceKeyFunc(podObj)
podMetadata := dataplane.NewPodMetadata(podKey, podObj.Status.PodIP, podObj.Spec.NodeName)
namespaceSet := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(podObj.Namespace, ipsets.Namespace)}
// Add the pod ip information into namespace's ipset.
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding pod %s (ip : %s) to ipset %s", podKey, podObj.Status.PodIP, podObj.Namespace)
if err = c.dp.AddToSets(namespaceSet, podMetadata); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to namespace ipset with err: %w", err)
}
// Create npmPod and add it to the podMap
npmPodObj := common.NewNpmPod(podObj)
c.podMap[podKey] = npmPodObj
metrics.AddPod()
// Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets.
for labelKey, labelVal := range podObj.Labels {
labelKeyValue := util.GetIpSetFromLabelKV(labelKey, labelVal)
targetSetKey := ipsets.NewIPSetMetadata(labelKey, ipsets.KeyLabelOfPod)
targetSetKeyValue := ipsets.NewIPSetMetadata(labelKeyValue, ipsets.KeyValueLabelOfPod)
allSets := []*ipsets.IPSetMetadata{targetSetKey, targetSetKeyValue}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Creating ipsets %+v and %+v if they do not exist", targetSetKey, targetSetKeyValue)
// klog.Infof("Adding pod %s (ip : %s) to ipset %s and %s", podKey, npmPodObj.PodIP, labelKey, labelKeyValue)
if err = c.dp.AddToSets(allSets, podMetadata); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to label ipset with err: %w", err)
}
npmPodObj.AppendLabels(map[string]string{labelKey: labelVal}, common.AppendToExistingLabels)
}
// Add pod's named ports from its ipset.
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding named port ipsets")
containerPorts := common.GetContainerPortList(podObj)
if err = c.manageNamedPortIpsets(containerPorts, podKey, npmPodObj.PodIP, podObj.Spec.NodeName, addNamedPort); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to named port ipset with err: %w", err)
}
npmPodObj.AppendContainerPorts(podObj)
return nil
}
// syncAddAndUpdatePod handles updating pod ip in its label's ipset.
func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.OperationKind, error) {
var err error
podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj)
// lock before using nsMap since nsMap is shared with namespace controller
c.npmNamespaceCache.Lock()
if _, exists := c.npmNamespaceCache.NsMap[newPodObj.Namespace]; !exists {
// Create ipset related to namespace which this pod belong to if it does not exist.
toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newPodObj.Namespace, ipsets.Namespace)}
if err = c.dp.AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, toBeAdded); err != nil {
c.npmNamespaceCache.Unlock()
// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add %s to all-namespace ipset list with err: %w", newPodObj.Namespace, err)
}
// Add namespace object into NsMap cache only when two ipset operations are successful.
npmNs := common.NewNs(newPodObj.Namespace)
c.npmNamespaceCache.NsMap[newPodObj.Namespace] = npmNs
}
c.npmNamespaceCache.Unlock()
cachedNpmPod, exists := c.podMap[podKey]
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("[syncAddAndUpdatePod] updating Pod with key %s", podKey)
// No cached npmPod exists. start adding the pod in a cache
if !exists {
return metrics.CreateOp, c.syncAddedPod(newPodObj)
}
// now we know this is an update event, and we'll return metrics.UpdateOp
// Dealing with "updatePod" event - Compare last applied states against current Pod states
// There are two possibilities for npmPodObj and newPodObj
// #1 case The same object with the same UID and the same key (namespace + name)
// #2 case Different objects with different UID, but the same key (namespace + name) due to missing some events for the old object
// Dealing with #2 pod update event, the IP addresses of cached npmPod and newPodObj are different
// NPM should clean up existing references of cached pod obj and its IP.
// then, re-add new pod obj.
if cachedNpmPod.PodIP != newPodObj.Status.PodIP {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Pod (Namespace:%s, Name:%s, newUid:%s), has cachedPodIp:%s which is different from PodIp:%s",
// newPodObj.Namespace, newPodObj.Name, string(newPodObj.UID), cachedNpmPod.PodIP, newPodObj.Status.PodIP)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Deleting cached Pod with key:%s first due to IP Mistmatch", podKey)
if er := c.cleanUpDeletedPod(podKey); er != nil {
return metrics.UpdateOp, er
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding back Pod with key:%s after IP Mistmatch", podKey)
return metrics.UpdateOp, c.syncAddedPod(newPodObj)
}
// Dealing with #1 pod update event, the IP addresses of cached npmPod and newPodObj are same
// If no change in labels, then GetIPSetListCompareLabels will return empty list.
// Otherwise it returns list of deleted PodIP from cached pod's labels and list of added PodIp from new pod's labels
addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(cachedNpmPod.Labels, newPodObj.Labels)
newPodMetadata := dataplane.NewPodMetadata(podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName)
// should have newPodMetadata == cachedPodMetadata since from branch above, we have cachedNpmPod.PodIP == newPodObj.Status.PodIP
cachedPodMetadata := dataplane.NewPodMetadata(podKey, cachedNpmPod.PodIP, newPodMetadata.NodeName)
// Delete the pod from its label's ipset.
for _, removeIPSetName := range deleteFromIPSets {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Deleting pod %s (ip : %s) from ipset %s", podKey, cachedNpmPod.PodIP, removeIPSetName)
var toRemoveSet *ipsets.IPSetMetadata
if util.IsKeyValueLabelSetName(removeIPSetName) {
toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyValueLabelOfPod)
} else {
toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyLabelOfPod)
}
if err = c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{toRemoveSet}, cachedPodMetadata); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err)
}
// {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key
// key + val ipsets are worked on. 0th index will be key and 1st index will be value of the label
removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(removeIPSetName)
if removedLabelValue != "" {
cachedNpmPod.RemoveLabelsWithKey(removedLabelKey)
}
}
// Add the pod to its label's ipset.
for _, addIPSetName := range addToIPSets {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Creating ipset %s if it doesn't already exist", addIPSetName)
var toAddSet *ipsets.IPSetMetadata
if util.IsKeyValueLabelSetName(addIPSetName) {
toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyValueLabelOfPod)
} else {
toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyLabelOfPod)
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding pod %s (ip : %s) to ipset %s", podKey, newPodObj.Status.PodIP, addIPSetName)
if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{toAddSet}, newPodMetadata); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err)
}
// {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels
// only after both ipsets for a given label's key value pair are added successfully
// (TODO) will need to remove this ordering dependency
addedLabelKey, addedLabelValue := util.GetLabelKVFromSet(addIPSetName)
if addedLabelValue != "" {
cachedNpmPod.AppendLabels(map[string]string{addedLabelKey: addedLabelValue}, common.AppendToExistingLabels)
}
}
// This will ensure after all labels are worked on to overwrite. This way will reduce any bugs introduced above
// If due to ordering issue the above deleted and added labels are not correct,
// this below appendLabels will help ensure correct state in cache for all successful ops.
cachedNpmPod.AppendLabels(newPodObj.Labels, common.ClearExistingLabels)
// (TODO): optimize named port addition and deletions.
// named ports are mostly static once configured in todays usage pattern
// so keeping this simple by deleting all and re-adding
newPodPorts := common.GetContainerPortList(newPodObj)
if !reflect.DeepEqual(cachedNpmPod.ContainerPorts, newPodPorts) {
// Delete cached pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(
cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from named port ipset with err: %w", err)
}
// Since portList ipset deletion is successful, NPM can remove cachedContainerPorts
cachedNpmPod.RemoveContainerPorts()
// Add new pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName, addNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to named port ipset with err: %w", err)
}
cachedNpmPod.AppendContainerPorts(newPodObj)
}
cachedNpmPod.UpdateNpmPodAttributes(newPodObj)
return metrics.UpdateOp, nil
}
// cleanUpDeletedPod cleans up all ipset associated with this pod
func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("[cleanUpDeletedPod] deleting Pod with key %s", cachedNpmPodKey)
// If cached npmPod does not exist, return nil
cachedNpmPod, exist := c.podMap[cachedNpmPodKey]
if !exist {
return nil
}
var err error
cachedPodMetadata := dataplane.NewPodMetadata(cachedNpmPodKey, cachedNpmPod.PodIP, "")
// Delete the pod from its namespace's ipset.
// note: NodeName empty is not going to call update pod
if err = c.dp.RemoveFromSets(
[]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNpmPod.Namespace, ipsets.Namespace)},
cachedPodMetadata); err != nil {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from namespace ipset with err: %w", err)
}
// Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start deleting them from ipsets
for labelKey, labelVal := range cachedNpmPod.Labels {
labelKeyValue := util.GetIpSetFromLabelKV(labelKey, labelVal)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Deleting pod %s (ip : %s) from ipsets %s and %s", cachedNpmPodKey, cachedNpmPod.PodIP, labelKey, labelKeyValue)
if err = c.dp.RemoveFromSets(
[]*ipsets.IPSetMetadata{
ipsets.NewIPSetMetadata(labelKey, ipsets.KeyLabelOfPod),
ipsets.NewIPSetMetadata(labelKeyValue, ipsets.KeyValueLabelOfPod),
},
cachedPodMetadata); err != nil {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from label ipset with err: %w", err)
}
cachedNpmPod.RemoveLabelsWithKey(labelKey)
}
// Delete pod's named ports from its ipset. Need to pass true in the manageNamedPortIpsets function call
if err = c.manageNamedPortIpsets(
cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from named port ipset with err: %w", err)
}
metrics.RemovePod()
delete(c.podMap, cachedNpmPodKey)
return nil
}
// manageNamedPortIpsets helps with adding or deleting Pod namedPort IPsets.
func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, podKey,
podIP, nodeName string, namedPortOperation NamedPortOperation) error {
if util.IsWindowsDP() {
// NOTE: if we support namedport operations, need to be careful of implications of including the node name in the pod metadata below
// since we say the node name is "" in cleanUpDeletedPod
klog.Warningf("Windows Dataplane does not support NamedPort operations. Operation: %s portList is %+v", namedPortOperation, portList)
return nil
}
for _, port := range portList {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("port is %+v", port)
if port.Name == "" {
continue
}
// K8s guarantees port.Protocol has "TCP", "UDP", or "SCTP" if the field exists.
var protocol string
if len(port.Protocol) != 0 {
// without adding ":" after protocol, ipset complains.
protocol = fmt.Sprintf("%s:", port.Protocol)
}
namedPortIpsetEntry := fmt.Sprintf("%s,%s%d", podIP, protocol, port.ContainerPort)
// nodename in NewPodMetadata is nil so UpdatePod is ignored
podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName)
switch namedPortOperation {
case deleteNamedPort:
if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil {
return fmt.Errorf("failed to remove from set when deleting named port with err %w", err)
}
case addNamedPort:
if err := c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil {
return fmt.Errorf("failed to add to set when deleting named port with err %w", err)
}
}
}
return nil
}
// isCompletePod evaluates whether this pod is completely in terminated states,
// which means pod is gracefully shutdown.
func isCompletePod(podObj *corev1.Pod) bool {
// DeletionTimestamp and DeletionGracePeriodSeconds in pod are not nil,
// which means pod is expected to be deleted and
// DeletionGracePeriodSeconds value is zero, which means the pod is gracefully terminated.
if podObj.DeletionTimestamp != nil && podObj.DeletionGracePeriodSeconds != nil && *podObj.DeletionGracePeriodSeconds == 0 {
return true
}
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them.
// So NPM will ignorer adding these pods
// TODO(jungukcho): what are the values of DeletionTimestamp and podObj.DeletionGracePeriodSeconds
// in either below status?
if podObj.Status.Phase == corev1.PodSucceeded || podObj.Status.Phase == corev1.PodFailed {
return true
}
return false
}
func hasValidPodIP(podObj *corev1.Pod) bool {
return len(podObj.Status.PodIP) > 0
}
func isHostNetworkPod(podObj *corev1.Pod) bool {
return podObj.Spec.HostNetwork
}