func()

in npm/pkg/dataplane/dataplane_windows.go [149:300]


func (dp *DataPlane) updatePod(pod *updateNPMPod) error {
	klog.Infof("[DataPlane] updatePod called. podKey: %s", pod.PodKey)
	if len(pod.IPSetsToAdd) == 0 && len(pod.IPSetsToRemove) == 0 {
		// nothing to do
		return nil
	}

	// lock the endpoint cache while we read/modify the endpoint with the pod's IP
	dp.endpointCache.Lock()
	defer dp.endpointCache.Unlock()

	// Check if pod is already present in cache
	endpoint, ok := dp.endpointCache.cache[pod.PodIP]
	if !ok {
		// ignore this err and pod endpoint will be deleted in ApplyDP
		// if the endpoint is not found, it means the pod is not part of this node or pod got deleted.
		klog.Warningf("[DataPlane] ignoring pod update since there is no corresponding endpoint. IP: %s. podKey: %s", pod.PodIP, pod.PodKey)
		return nil
	}

	if endpoint.podKey == unspecifiedPodKey {
		// while refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key
		klog.Infof("[DataPlane] associating pod with endpoint. podKey: %s. endpoint: %+v", pod.PodKey, endpoint)
		endpoint.podKey = pod.PodKey
	} else if pod.PodKey == endpoint.previousIncorrectPodKey {
		klog.Infof("[DataPlane] ignoring pod update since this pod was previously and incorrectly assigned to this endpoint. endpoint: %+v", endpoint)
		return nil
	} else if pod.PodKey != endpoint.podKey {
		// solves issue 1729
		klog.Infof("[DataPlane] pod key has changed. will reset endpoint acls and skip looking ipsets to remove. new podKey: %s. previous endpoint: %+v", pod.PodKey, endpoint)
		if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil {
			return fmt.Errorf("failed to reset endpoint for pod with incorrect pod key. new podKey: %s. previous endpoint: %+v. err: %w", pod.PodKey, endpoint, err)
		}

		// mark this after successful reset. If before reset, we would not retry on failure
		endpoint.previousIncorrectPodKey = endpoint.podKey
		endpoint.podKey = pod.PodKey

		// all ACLs were removed, so in case there were ipsets to remove, there's no need to look for policies to delete
		pod.IPSetsToRemove = nil

		if dp.NetworkName == util.CalicoNetworkName {
			klog.Infof("adding back base ACLs for calico CNI endpoint after resetting ACLs. endpoint: %+v", endpoint)
			dp.policyMgr.AddBaseACLsForCalicoCNI(endpoint.id)
		}
	}

	// for every ipset we're removing from the endpoint, remove from the endpoint any policy that requires the set
	for _, setName := range pod.IPSetsToRemove {
		/*
			Scenarios:
			1. There's a chance a policy is/was just removed, but the ipset's selector hasn't been updated yet.
			   We may try to remove the policy again here, which is ok.

			2. If a policy is added to the ipset's selector after getting the selector (meaning dp.AddPolicy() was called),
			   we won't try to remove the policy, which is fine since the policy must've never existed on the endpoint.

			3. If a policy is added to the ipset's selector in a dp.AddPolicy() thread AFTER getting the selector here,
			   then the ensuing policyMgr.AddPolicy() call will wait for this function to release the endpointCache lock.

			4. If a policy is added to the ipset's selector in a dp.AddPolicy() thread BEFORE getting the selector here,
			   there could be a race between policyMgr.RemovePolicy() here and policyMgr.AddPolicy() there.
		*/
		selectorReference, err := dp.ipsetMgr.GetSelectorReferencesBySet(setName)
		if err != nil {
			// ignore this set since it may have been deleted in the background reconcile thread
			klog.Infof("[DataPlane] ignoring pod update for ipset to remove since the set does not exist. pod: %+v. set: %s", pod, setName)
			continue
		}

		for policyKey := range selectorReference {
			// Now check if any of these network policies are applied on this endpoint.
			// If yes then proceed to delete the network policy.
			if _, ok := endpoint.netPolReference[policyKey]; ok {
				// Delete the network policy
				endpointList := map[string]string{
					endpoint.ip: endpoint.id,
				}
				err := dp.policyMgr.RemovePolicyForEndpoints(policyKey, endpointList)
				if err != nil {
					return err
				}
				delete(endpoint.netPolReference, policyKey)
			}
		}
	}

	// for every ipset we're adding to the endpoint, consider adding to the endpoint every policy that the set touches
	// add policy if:
	// 1. it's not already there
	// 2. the pod IP is part of every set that the policy requires (every set in the pod selector)
	toAddPolicies := make(map[string]struct{})
	for _, setName := range pod.IPSetsToAdd {
		/*
			Scenarios:
			1. If a policy is added to the ipset's selector after getting the selector (meaning dp.AddPolicy() was called),
			   we will miss adding the policy here, but will add the policy to all endpoints in that other thread, which has
			   to wait on the endpointCache lock when calling getEndpointsToApplyPolicies().

			2. We may add the policy here and in the dp.AddPolicy() thread if the policy is added to the ipset's selector before
			   that other thread calls policyMgr.AddPolicy(), which is ok.

			3. FIXME: If a policy is/was just removed, but the ipset's selector hasn't been updated yet,
			   we may try to add the policy again here...
		*/
		selectorReference, err := dp.ipsetMgr.GetSelectorReferencesBySet(setName)
		if err != nil {
			// ignore this set since it may have been deleted in the background reconcile thread
			klog.Infof("[DataPlane] ignoring pod update for ipset to remove since the set does not exist. pod: %+v. set: %s", pod, setName)
			continue
		}

		for policyKey := range selectorReference {
			if _, ok := endpoint.netPolReference[policyKey]; ok {
				continue
			}

			policy, ok := dp.policyMgr.GetPolicy(policyKey)
			if !ok {
				klog.Infof("[DataPlane] while updating pod, policy is referenced but does not exist. pod: [%s], policy: [%s], set [%s]", pod.PodKey, policyKey, setName)
				continue
			}

			selectorIPSets := dp.getSelectorIPSets(policy)
			ok, err := dp.ipsetMgr.DoesIPSatisfySelectorIPSets(pod.PodIP, pod.PodKey, selectorIPSets)
			if err != nil {
				return fmt.Errorf("[DataPlane] error getting IPs satisfying selector ipsets: %w", err)
			}
			if !ok {
				continue
			}

			toAddPolicies[policyKey] = struct{}{}
		}
	}

	if len(toAddPolicies) == 0 {
		return nil
	}

	successfulPolicies, err := dp.policyMgr.AddAllPolicies(toAddPolicies, endpoint.id, endpoint.ip)
	for policyKey := range successfulPolicies {
		endpoint.netPolReference[policyKey] = struct{}{}
	}
	if err != nil {
		return fmt.Errorf("failed to add all policies while updating pod. endpoint: %+v. policies: %+v. err: %w", endpoint, toAddPolicies, err)
	}

	klog.Infof("[DataPlane] updatedPod complete. podKey: %s. endpoint: %+v", pod.PodKey, endpoint)

	return nil
}