in npm/pkg/controlplane/controllers/v1/podController.go [371:494]
func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.OperationKind, error) {
var err error
newPodObjNs := util.GetNSNameWithPrefix(newPodObj.Namespace)
// lock before using nsMap since nsMap is shared with namespace controller
c.npmNamespaceCache.Lock()
if _, exists := c.npmNamespaceCache.NsMap[newPodObjNs]; !exists {
// Create ipset related to namespace which this pod belong to if it does not exist.
if err = c.ipsMgr.CreateSet(newPodObjNs, []string{util.IpsetNetHashFlag}); err != nil {
c.npmNamespaceCache.Unlock()
// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to create ipset for namespace %s with err: %w", newPodObjNs, err)
}
if err = c.ipsMgr.AddToList(util.KubeAllNamespacesFlag, newPodObjNs); err != nil {
c.npmNamespaceCache.Unlock()
// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add %s to all-namespace ipset list with err: %w", newPodObjNs, err)
}
// Add namespace object into NsMap cache only when two ipset operations are successful.
npmNs := common.NewNs(newPodObjNs)
c.npmNamespaceCache.NsMap[newPodObjNs] = npmNs
}
c.npmNamespaceCache.Unlock()
podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj)
cachedNpmPod, exists := c.podMap[podKey]
klog.Infof("[syncAddAndUpdatePod] updating Pod with key %s", podKey)
// No cached npmPod exists. start adding the pod in a cache
if !exists {
if err = c.syncAddedPod(newPodObj); err != nil {
return metrics.CreateOp, err
}
return metrics.CreateOp, nil
}
// now we know this is an update event, and we'll return metrics.UpdateOp
// Dealing with "updatePod" event - Compare last applied states against current Pod states
// There are two possiblities for npmPodObj and newPodObj
// #1 case The same object with the same UID and the same key (namespace + name)
// #2 case Different objects with different UID, but the same key (namespace + name) due to missing some events for the old object
// Dealing with #2 pod update event, the IP addresses of cached npmPod and newPodObj are different
// NPM should clean up existing references of cached pod obj and its IP.
// then, re-add new pod obj.
if cachedNpmPod.PodIP != newPodObj.Status.PodIP {
klog.Infof("Pod (Namespace:%s, Name:%s, newUid:%s), has cachedPodIp:%s which is different from PodIp:%s",
newPodObj.Namespace, newPodObj.Name, string(newPodObj.UID), cachedNpmPod.PodIP, newPodObj.Status.PodIP)
klog.Infof("Deleting cached Pod with key:%s first due to IP Mistmatch", podKey)
if err = c.cleanUpDeletedPod(podKey); err != nil {
return metrics.UpdateOp, err
}
klog.Infof("Adding back Pod with key:%s after IP Mistmatch", podKey)
if err = c.syncAddedPod(newPodObj); err != nil {
return metrics.UpdateOp, err
}
return metrics.UpdateOp, nil
}
// Dealing with #1 pod update event, the IP addresses of cached npmPod and newPodObj are same
// If no change in labels, then GetIPSetListCompareLabels will return empty list.
// Otherwise it returns list of deleted PodIP from cached pod's labels and list of added PodIp from new pod's labels
addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(cachedNpmPod.Labels, newPodObj.Labels)
// Delete the pod from its label's ipset.
for _, podIPSetName := range deleteFromIPSets {
klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName)
if err = c.ipsMgr.DeleteFromSet(podIPSetName, cachedNpmPod.PodIP, podKey); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err)
}
// {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key
// key + val ipsets are worked on. 0th index will be key and 1st index will be value of the label
removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(podIPSetName)
if removedLabelValue != "" {
cachedNpmPod.RemoveLabelsWithKey(removedLabelKey)
}
}
// Add the pod to its label's ipset.
for _, addIPSetName := range addToIPSets {
klog.Infof("Adding pod %s to ipset %s", newPodObj.Status.PodIP, addIPSetName)
if err = c.ipsMgr.AddToSet(addIPSetName, newPodObj.Status.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err)
}
// {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels
// only after both ipsets for a given label's key value pair are added successfully
// (TODO) will need to remove this ordering dependency
addedLabelKey, addedLabelValue := util.GetLabelKVFromSet(addIPSetName)
if addedLabelValue != "" {
cachedNpmPod.AppendLabels(map[string]string{addedLabelKey: addedLabelValue}, common.AppendToExistingLabels)
}
}
// This will ensure after all labels are worked on to overwrite. This way will reduce any bugs introduced above
// If due to ordering issue the above deleted and added labels are not correct,
// this below appendLabels will help ensure correct state in cache for all successful ops.
cachedNpmPod.AppendLabels(newPodObj.Labels, common.ClearExistingLabels)
// (TODO): optimize named port addition and deletions.
// named ports are mostly static once configured in todays usage pattern
// so keeping this simple by deleting all and re-adding
newPodPorts := common.GetContainerPortList(newPodObj)
if !reflect.DeepEqual(cachedNpmPod.ContainerPorts, newPodPorts) {
// Delete cached pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(
cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, deleteNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from named port ipset with err: %w", err)
}
// Since portList ipset deletion is successful, NPM can remove cachedContainerPorts
cachedNpmPod.RemoveContainerPorts()
// Add new pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, addNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to named port ipset with err: %w", err)
}
cachedNpmPod.AppendContainerPorts(newPodObj)
}
cachedNpmPod.UpdateNpmPodAttributes(newPodObj)
return metrics.UpdateOp, nil
}