func()

in npm/pkg/controlplane/controllers/v1/podController.go [371:494]


func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.OperationKind, error) {
	var err error
	newPodObjNs := util.GetNSNameWithPrefix(newPodObj.Namespace)

	// lock before using nsMap since nsMap is shared with namespace controller
	c.npmNamespaceCache.Lock()
	if _, exists := c.npmNamespaceCache.NsMap[newPodObjNs]; !exists {
		// Create ipset related to namespace which this pod belong to if it does not exist.
		if err = c.ipsMgr.CreateSet(newPodObjNs, []string{util.IpsetNetHashFlag}); err != nil {
			c.npmNamespaceCache.Unlock()
			// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
			return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to create ipset for namespace %s with err: %w", newPodObjNs, err)
		}

		if err = c.ipsMgr.AddToList(util.KubeAllNamespacesFlag, newPodObjNs); err != nil {
			c.npmNamespaceCache.Unlock()
			// since the namespace doesn't exist, this must be a pod create event, so we'll return metrics.CreateOp
			return metrics.CreateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add %s to all-namespace ipset list with err: %w", newPodObjNs, err)
		}

		// Add namespace object into NsMap cache only when two ipset operations are successful.
		npmNs := common.NewNs(newPodObjNs)
		c.npmNamespaceCache.NsMap[newPodObjNs] = npmNs
	}
	c.npmNamespaceCache.Unlock()

	podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj)
	cachedNpmPod, exists := c.podMap[podKey]
	klog.Infof("[syncAddAndUpdatePod] updating Pod with key %s", podKey)
	// No cached npmPod exists. start adding the pod in a cache
	if !exists {
		if err = c.syncAddedPod(newPodObj); err != nil {
			return metrics.CreateOp, err
		}
		return metrics.CreateOp, nil
	}
	// now we know this is an update event, and we'll return metrics.UpdateOp

	// Dealing with "updatePod" event - Compare last applied states against current Pod states
	// There are two possiblities for npmPodObj and newPodObj
	// #1 case The same object with the same UID and the same key (namespace + name)
	// #2 case Different objects with different UID, but the same key (namespace + name) due to missing some events for the old object

	// Dealing with #2 pod update event, the IP addresses of cached npmPod and newPodObj are different
	// NPM should clean up existing references of cached pod obj and its IP.
	// then, re-add new pod obj.
	if cachedNpmPod.PodIP != newPodObj.Status.PodIP {
		klog.Infof("Pod (Namespace:%s, Name:%s, newUid:%s), has cachedPodIp:%s which is different from PodIp:%s",
			newPodObj.Namespace, newPodObj.Name, string(newPodObj.UID), cachedNpmPod.PodIP, newPodObj.Status.PodIP)

		klog.Infof("Deleting cached Pod with key:%s first due to IP Mistmatch", podKey)
		if err = c.cleanUpDeletedPod(podKey); err != nil {
			return metrics.UpdateOp, err
		}

		klog.Infof("Adding back Pod with key:%s after IP Mistmatch", podKey)
		if err = c.syncAddedPod(newPodObj); err != nil {
			return metrics.UpdateOp, err
		}

		return metrics.UpdateOp, nil
	}

	// Dealing with #1 pod update event, the IP addresses of cached npmPod and newPodObj are same
	// If no change in labels, then GetIPSetListCompareLabels will return empty list.
	// Otherwise it returns list of deleted PodIP from cached pod's labels and list of added PodIp from new pod's labels
	addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(cachedNpmPod.Labels, newPodObj.Labels)

	// Delete the pod from its label's ipset.
	for _, podIPSetName := range deleteFromIPSets {
		klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName)
		if err = c.ipsMgr.DeleteFromSet(podIPSetName, cachedNpmPod.PodIP, podKey); err != nil {
			return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err)
		}
		// {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key
		// key + val ipsets are worked on. 0th index will be key and 1st index will be value of the label
		removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(podIPSetName)
		if removedLabelValue != "" {
			cachedNpmPod.RemoveLabelsWithKey(removedLabelKey)
		}
	}

	// Add the pod to its label's ipset.
	for _, addIPSetName := range addToIPSets {
		klog.Infof("Adding pod %s to ipset %s", newPodObj.Status.PodIP, addIPSetName)
		if err = c.ipsMgr.AddToSet(addIPSetName, newPodObj.Status.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
			return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err)
		}
		// {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels
		// only after both ipsets for a given label's key value pair are added successfully
		// (TODO) will need to remove this ordering dependency
		addedLabelKey, addedLabelValue := util.GetLabelKVFromSet(addIPSetName)
		if addedLabelValue != "" {
			cachedNpmPod.AppendLabels(map[string]string{addedLabelKey: addedLabelValue}, common.AppendToExistingLabels)
		}
	}
	// This will ensure after all labels are worked on to overwrite. This way will reduce any bugs introduced above
	// If due to ordering issue the above deleted and added labels are not correct,
	// this below appendLabels will help ensure correct state in cache for all successful ops.
	cachedNpmPod.AppendLabels(newPodObj.Labels, common.ClearExistingLabels)

	// (TODO): optimize named port addition and deletions.
	// named ports are mostly static once configured in todays usage pattern
	// so keeping this simple by deleting all and re-adding
	newPodPorts := common.GetContainerPortList(newPodObj)
	if !reflect.DeepEqual(cachedNpmPod.ContainerPorts, newPodPorts) {
		// Delete cached pod's named ports from its ipset.
		if err = c.manageNamedPortIpsets(
			cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, deleteNamedPort); err != nil {
			return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from named port ipset with err: %w", err)
		}
		// Since portList ipset deletion is successful, NPM can remove cachedContainerPorts
		cachedNpmPod.RemoveContainerPorts()

		// Add new pod's named ports from its ipset.
		if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, addNamedPort); err != nil {
			return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to named port ipset with err: %w", err)
		}
		cachedNpmPod.AppendContainerPorts(newPodObj)
	}
	cachedNpmPod.UpdateNpmPodAttributes(newPodObj)

	return metrics.UpdateOp, nil
}