in npm/pkg/controlplane/controllers/v2/podController.go [419:557]
func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.OperationKind, error) {
var err error
podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj)
// lock before using nsMap since nsMap is shared with namespace controller
c.npmNamespaceCache.Lock()
if _, exists := c.npmNamespaceCache.NsMap[newPodObj.Namespace]; !exists {
// Create ipset related to namespace which this pod belong to if it does not exist.
toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newPodObj.Namespace, ipsets.Namespace)}
if err = c.dp.AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, toBeAdded); err != nil {
c.npmNamespaceCache.Unlock()
// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add %s to all-namespace ipset list with err: %w", newPodObj.Namespace, err)
}
// Add namespace object into NsMap cache only when two ipset operations are successful.
npmNs := common.NewNs(newPodObj.Namespace)
c.npmNamespaceCache.NsMap[newPodObj.Namespace] = npmNs
}
c.npmNamespaceCache.Unlock()
cachedNpmPod, exists := c.podMap[podKey]
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("[syncAddAndUpdatePod] updating Pod with key %s", podKey)
// No cached npmPod exists. start adding the pod in a cache
if !exists {
return metrics.CreateOp, c.syncAddedPod(newPodObj)
}
// now we know this is an update event, and we'll return metrics.UpdateOp
// Dealing with "updatePod" event - Compare last applied states against current Pod states
// There are two possibilities for npmPodObj and newPodObj
// #1 case The same object with the same UID and the same key (namespace + name)
// #2 case Different objects with different UID, but the same key (namespace + name) due to missing some events for the old object
// Dealing with #2 pod update event, the IP addresses of cached npmPod and newPodObj are different
// NPM should clean up existing references of cached pod obj and its IP.
// then, re-add new pod obj.
if cachedNpmPod.PodIP != newPodObj.Status.PodIP {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Pod (Namespace:%s, Name:%s, newUid:%s), has cachedPodIp:%s which is different from PodIp:%s",
// newPodObj.Namespace, newPodObj.Name, string(newPodObj.UID), cachedNpmPod.PodIP, newPodObj.Status.PodIP)
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Deleting cached Pod with key:%s first due to IP Mistmatch", podKey)
if er := c.cleanUpDeletedPod(podKey); er != nil {
return metrics.UpdateOp, er
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding back Pod with key:%s after IP Mistmatch", podKey)
return metrics.UpdateOp, c.syncAddedPod(newPodObj)
}
// Dealing with #1 pod update event, the IP addresses of cached npmPod and newPodObj are same
// If no change in labels, then GetIPSetListCompareLabels will return empty list.
// Otherwise it returns list of deleted PodIP from cached pod's labels and list of added PodIp from new pod's labels
addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(cachedNpmPod.Labels, newPodObj.Labels)
newPodMetadata := dataplane.NewPodMetadata(podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName)
// should have newPodMetadata == cachedPodMetadata since from branch above, we have cachedNpmPod.PodIP == newPodObj.Status.PodIP
cachedPodMetadata := dataplane.NewPodMetadata(podKey, cachedNpmPod.PodIP, newPodMetadata.NodeName)
// Delete the pod from its label's ipset.
for _, removeIPSetName := range deleteFromIPSets {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Deleting pod %s (ip : %s) from ipset %s", podKey, cachedNpmPod.PodIP, removeIPSetName)
var toRemoveSet *ipsets.IPSetMetadata
if util.IsKeyValueLabelSetName(removeIPSetName) {
toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyValueLabelOfPod)
} else {
toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyLabelOfPod)
}
if err = c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{toRemoveSet}, cachedPodMetadata); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err)
}
// {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key
// key + val ipsets are worked on. 0th index will be key and 1st index will be value of the label
removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(removeIPSetName)
if removedLabelValue != "" {
cachedNpmPod.RemoveLabelsWithKey(removedLabelKey)
}
}
// Add the pod to its label's ipset.
for _, addIPSetName := range addToIPSets {
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Creating ipset %s if it doesn't already exist", addIPSetName)
var toAddSet *ipsets.IPSetMetadata
if util.IsKeyValueLabelSetName(addIPSetName) {
toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyValueLabelOfPod)
} else {
toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyLabelOfPod)
}
// TODO: Refactor non-error/warning klogs with Zap and set the following logs to "debug" level
// klog.Infof("Adding pod %s (ip : %s) to ipset %s", podKey, newPodObj.Status.PodIP, addIPSetName)
if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{toAddSet}, newPodMetadata); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err)
}
// {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels
// only after both ipsets for a given label's key value pair are added successfully
// (TODO) will need to remove this ordering dependency
addedLabelKey, addedLabelValue := util.GetLabelKVFromSet(addIPSetName)
if addedLabelValue != "" {
cachedNpmPod.AppendLabels(map[string]string{addedLabelKey: addedLabelValue}, common.AppendToExistingLabels)
}
}
// This will ensure after all labels are worked on to overwrite. This way will reduce any bugs introduced above
// If due to ordering issue the above deleted and added labels are not correct,
// this below appendLabels will help ensure correct state in cache for all successful ops.
cachedNpmPod.AppendLabels(newPodObj.Labels, common.ClearExistingLabels)
// (TODO): optimize named port addition and deletions.
// named ports are mostly static once configured in todays usage pattern
// so keeping this simple by deleting all and re-adding
newPodPorts := common.GetContainerPortList(newPodObj)
if !reflect.DeepEqual(cachedNpmPod.ContainerPorts, newPodPorts) {
// Delete cached pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(
cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from named port ipset with err: %w", err)
}
// Since portList ipset deletion is successful, NPM can remove cachedContainerPorts
cachedNpmPod.RemoveContainerPorts()
// Add new pod's named ports from its ipset.
if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName, addNamedPort); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to named port ipset with err: %w", err)
}
cachedNpmPod.AppendContainerPorts(newPodObj)
}
cachedNpmPod.UpdateNpmPodAttributes(newPodObj)
return metrics.UpdateOp, nil
}