npm/pkg/dataplane/dataplane_windows.go (332 lines of code) (raw):

package dataplane import ( "encoding/json" "fmt" "strings" "time" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "github.com/Microsoft/hcsshim/hcn" "github.com/pkg/errors" "k8s.io/klog" ) const ( maxNoNetRetryCount int = 240 // max wait time 240*5 == 20 mins maxNoNetSleepTime int = 5 // in seconds // used for lints hcnSchemaMajorVersion = 2 hcnSchemaMinorVersion = 0 ) var errPolicyModeUnsupported = errors.New("only IPSet policy mode is supported") // initializeDataPlane will help gather network and endpoint details func (dp *DataPlane) initializeDataPlane() error { klog.Infof("[DataPlane] Initializing dataplane for windows") if dp.PolicyMode == "" { dp.PolicyMode = policies.IPSetPolicyMode } if dp.PolicyMode != policies.IPSetPolicyMode { return errPolicyModeUnsupported } err := dp.getNetworkInfo() if err != nil { return npmerrors.SimpleErrorWrapper("failed to get network info", err) } // Initialize Endpoint query used to filter healthy endpoints (vNIC) of Windows pods dp.endpointQuery.query = hcn.HostComputeQuery{ SchemaVersion: hcn.SchemaVersion{ Major: hcnSchemaMajorVersion, Minor: hcnSchemaMinorVersion, }, Flags: hcn.HostComputeQueryFlagsNone, } // Initialize Endpoint query used to filter healthy endpoints (vNIC) of Windows pods on L1VH Node dp.endpointQueryAttachedState.query = hcn.HostComputeQuery{ SchemaVersion: hcn.SchemaVersion{ Major: hcnSchemaMajorVersion, Minor: hcnSchemaMinorVersion, }, Flags: hcn.HostComputeQueryFlagsNone, } // Filter out any endpoints that are not in "AttachedShared" State. All running Windows pods with networking must be in this state. filterMap := map[string]uint16{"State": hcnEndpointStateAttachedSharing} filter, err := json.Marshal(filterMap) if err != nil { return errors.Wrap(err, "failed to marshal endpoint filter map for attachedsharing state") } dp.endpointQuery.query.Filter = string(filter) // Filter out any endpoints that are not in "Attached" State. All running Windows pods on L1VH with networking must be in this state. filterMapAttached := map[string]uint16{"State": hcnEndpointStateAttached} filterAttached, err := json.Marshal(filterMapAttached) if err != nil { return errors.Wrap(err, "failed to marshal endpoint filter map for attched state") } dp.endpointQueryAttachedState.query.Filter = string(filterAttached) // reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints // no need to lock endpointCache at boot up dp.endpointCache.cache = make(map[string]*npmEndpoint) return nil } func (dp *DataPlane) getNetworkInfo() error { retryNumber := 0 ticker := time.NewTicker(time.Second * time.Duration(maxNoNetSleepTime)) defer ticker.Stop() var err error for ; true; <-ticker.C { err = dp.setNetworkIDByName(dp.NetworkName) if err == nil || !isNetworkNotFoundErr(err) { return err } retryNumber++ if retryNumber >= maxNoNetRetryCount { break } klog.Infof("[DataPlane Windows] Network with name %s not found. Retrying in %d seconds, Current retry number %d, max retries: %d", dp.NetworkName, maxNoNetSleepTime, retryNumber, maxNoNetRetryCount, ) } return fmt.Errorf("failed to get network info after %d retries with err %w", maxNoNetRetryCount, err) } func (dp *DataPlane) bootupDataPlane() error { // initialize the DP so the podendpoints will get updated. if err := dp.initializeDataPlane(); err != nil { return npmerrors.SimpleErrorWrapper("failed to initialize dataplane", err) } allEndpoints, err := dp.getLocalPodEndpoints() if err != nil { return err } // TODO once we make endpoint refreshing smarter, it would be most efficient to use allEndpoints to refreshPodEndpoints here. // But currently, we call refreshPodEndpoints for every Pod event, so this optimization wouldn't do anything for now. // There's also no need to refreshPodEndpoints at bootup since we don't know of any Pods at this point, and the endpoint cache is only needed for known Pods. epIDs := make([]string, len(allEndpoints)) for k, e := range allEndpoints { epIDs[k] = e.Id } // It is important to keep order to clean-up ACLs before ipsets. Otherwise we won't be able to delete ipsets referenced by ACLs if err := dp.policyMgr.Bootup(epIDs); err != nil { return npmerrors.ErrorWrapper(npmerrors.BootupDataplane, false, "failed to reset policy dataplane", err) } if err := dp.ipsetMgr.ResetIPSets(); err != nil { return npmerrors.ErrorWrapper(npmerrors.BootupDataplane, false, "failed to reset ipsets dataplane", err) } return nil } func (dp *DataPlane) shouldUpdatePod() bool { return true } // updatePod has two responsibilities in windows // 1. Will call into dataplane and updates endpoint references of this pod. // 2. Will check for existing applicable network policies and applies it on endpoint. // Assumption: a Pod won't take up its previously used IP when restarting (see https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change) func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called. podKey: %s", pod.PodKey) if len(pod.IPSetsToAdd) == 0 && len(pod.IPSetsToRemove) == 0 { // nothing to do return nil } // lock the endpoint cache while we read/modify the endpoint with the pod's IP dp.endpointCache.Lock() defer dp.endpointCache.Unlock() // Check if pod is already present in cache endpoint, ok := dp.endpointCache.cache[pod.PodIP] if !ok { // ignore this err and pod endpoint will be deleted in ApplyDP // if the endpoint is not found, it means the pod is not part of this node or pod got deleted. klog.Warningf("[DataPlane] ignoring pod update since there is no corresponding endpoint. IP: %s. podKey: %s", pod.PodIP, pod.PodKey) return nil } if endpoint.podKey == unspecifiedPodKey { // while refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key klog.Infof("[DataPlane] associating pod with endpoint. podKey: %s. endpoint: %+v", pod.PodKey, endpoint) endpoint.podKey = pod.PodKey } else if pod.PodKey == endpoint.previousIncorrectPodKey { klog.Infof("[DataPlane] ignoring pod update since this pod was previously and incorrectly assigned to this endpoint. endpoint: %+v", endpoint) return nil } else if pod.PodKey != endpoint.podKey { // solves issue 1729 klog.Infof("[DataPlane] pod key has changed. will reset endpoint acls and skip looking ipsets to remove. new podKey: %s. previous endpoint: %+v", pod.PodKey, endpoint) if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { return fmt.Errorf("failed to reset endpoint for pod with incorrect pod key. new podKey: %s. previous endpoint: %+v. err: %w", pod.PodKey, endpoint, err) } // mark this after successful reset. If before reset, we would not retry on failure endpoint.previousIncorrectPodKey = endpoint.podKey endpoint.podKey = pod.PodKey // all ACLs were removed, so in case there were ipsets to remove, there's no need to look for policies to delete pod.IPSetsToRemove = nil if dp.NetworkName == util.CalicoNetworkName { klog.Infof("adding back base ACLs for calico CNI endpoint after resetting ACLs. endpoint: %+v", endpoint) dp.policyMgr.AddBaseACLsForCalicoCNI(endpoint.id) } } // for every ipset we're removing from the endpoint, remove from the endpoint any policy that requires the set for _, setName := range pod.IPSetsToRemove { /* Scenarios: 1. There's a chance a policy is/was just removed, but the ipset's selector hasn't been updated yet. We may try to remove the policy again here, which is ok. 2. If a policy is added to the ipset's selector after getting the selector (meaning dp.AddPolicy() was called), we won't try to remove the policy, which is fine since the policy must've never existed on the endpoint. 3. If a policy is added to the ipset's selector in a dp.AddPolicy() thread AFTER getting the selector here, then the ensuing policyMgr.AddPolicy() call will wait for this function to release the endpointCache lock. 4. If a policy is added to the ipset's selector in a dp.AddPolicy() thread BEFORE getting the selector here, there could be a race between policyMgr.RemovePolicy() here and policyMgr.AddPolicy() there. */ selectorReference, err := dp.ipsetMgr.GetSelectorReferencesBySet(setName) if err != nil { // ignore this set since it may have been deleted in the background reconcile thread klog.Infof("[DataPlane] ignoring pod update for ipset to remove since the set does not exist. pod: %+v. set: %s", pod, setName) continue } for policyKey := range selectorReference { // Now check if any of these network policies are applied on this endpoint. // If yes then proceed to delete the network policy. if _, ok := endpoint.netPolReference[policyKey]; ok { // Delete the network policy endpointList := map[string]string{ endpoint.ip: endpoint.id, } err := dp.policyMgr.RemovePolicyForEndpoints(policyKey, endpointList) if err != nil { return err } delete(endpoint.netPolReference, policyKey) } } } // for every ipset we're adding to the endpoint, consider adding to the endpoint every policy that the set touches // add policy if: // 1. it's not already there // 2. the pod IP is part of every set that the policy requires (every set in the pod selector) toAddPolicies := make(map[string]struct{}) for _, setName := range pod.IPSetsToAdd { /* Scenarios: 1. If a policy is added to the ipset's selector after getting the selector (meaning dp.AddPolicy() was called), we will miss adding the policy here, but will add the policy to all endpoints in that other thread, which has to wait on the endpointCache lock when calling getEndpointsToApplyPolicies(). 2. We may add the policy here and in the dp.AddPolicy() thread if the policy is added to the ipset's selector before that other thread calls policyMgr.AddPolicy(), which is ok. 3. FIXME: If a policy is/was just removed, but the ipset's selector hasn't been updated yet, we may try to add the policy again here... */ selectorReference, err := dp.ipsetMgr.GetSelectorReferencesBySet(setName) if err != nil { // ignore this set since it may have been deleted in the background reconcile thread klog.Infof("[DataPlane] ignoring pod update for ipset to remove since the set does not exist. pod: %+v. set: %s", pod, setName) continue } for policyKey := range selectorReference { if _, ok := endpoint.netPolReference[policyKey]; ok { continue } policy, ok := dp.policyMgr.GetPolicy(policyKey) if !ok { klog.Infof("[DataPlane] while updating pod, policy is referenced but does not exist. pod: [%s], policy: [%s], set [%s]", pod.PodKey, policyKey, setName) continue } selectorIPSets := dp.getSelectorIPSets(policy) ok, err := dp.ipsetMgr.DoesIPSatisfySelectorIPSets(pod.PodIP, pod.PodKey, selectorIPSets) if err != nil { return fmt.Errorf("[DataPlane] error getting IPs satisfying selector ipsets: %w", err) } if !ok { continue } toAddPolicies[policyKey] = struct{}{} } } if len(toAddPolicies) == 0 { return nil } successfulPolicies, err := dp.policyMgr.AddAllPolicies(toAddPolicies, endpoint.id, endpoint.ip) for policyKey := range successfulPolicies { endpoint.netPolReference[policyKey] = struct{}{} } if err != nil { return fmt.Errorf("failed to add all policies while updating pod. endpoint: %+v. policies: %+v. err: %w", endpoint, toAddPolicies, err) } klog.Infof("[DataPlane] updatedPod complete. podKey: %s. endpoint: %+v", pod.PodKey, endpoint) return nil } func (dp *DataPlane) getSelectorIPSets(policy *policies.NPMNetworkPolicy) map[string]struct{} { selectorIpSets := make(map[string]struct{}) for _, ipset := range policy.PodSelectorIPSets { selectorIpSets[ipset.Metadata.GetPrefixName()] = struct{}{} } klog.Infof("policy %s has policy selector: %+v", policy.PolicyKey, selectorIpSets) return selectorIpSets } func (dp *DataPlane) getEndpointsToApplyPolicies(netPols []*policies.NPMNetworkPolicy) (map[string]string, error) { if len(netPols) != 1 { return nil, ErrIncorrectNumberOfNetPols } netPol := netPols[0] selectorIPSets := dp.getSelectorIPSets(netPol) netpolSelectorIPs, err := dp.ipsetMgr.GetIPsFromSelectorIPSets(selectorIPSets) if err != nil { return nil, err } // lock the endpoint cache while we read/modify the endpoints with IPs in the policy's pod selector dp.endpointCache.Lock() defer dp.endpointCache.Unlock() endpointList := make(map[string]string) for ip, podKey := range netpolSelectorIPs { endpoint, ok := dp.endpointCache.cache[ip] if !ok { klog.Infof("[DataPlane] ignoring selector IP since it was not found in the endpoint cache and might not be in the HNS network. ip: %s. podKey: %s", ip, podKey) continue } if endpoint.podKey != podKey { // in case the pod controller hasn't updated the dp yet that the IP's pod owner has changed klog.Infof("[DataPlane] ignoring selector IP since the endpoint is assigned to a different podKey. ip: %s. podKey: %s. endpoint: %+v", ip, podKey, endpoint) continue } endpointList[ip] = endpoint.id endpoint.netPolReference[netPol.PolicyKey] = struct{}{} } return endpointList, nil } func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error) { klog.Info("getting local endpoints") // Gets endpoints in state: Attached timer := metrics.StartNewTimer() endpointsAttached, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQueryAttachedState.query) metrics.RecordListEndpointsLatency(timer) if err != nil { metrics.IncListEndpointsFailures() return nil, errors.Wrap(err, "failed to get local pod endpoints in state:attached") } // Gets endpoints in state: AttachedSharing timer = metrics.StartNewTimer() endpoints, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQuery.query) metrics.RecordListEndpointsLatency(timer) if err != nil { metrics.IncListEndpointsFailures() return nil, errors.Wrap(err, "failed to get local pod endpoints in state: attachedSharing") } // Get endpoints unique to endpoints and endpointsAttached endpoints = GetUniqueEndpoints(endpoints, endpointsAttached) epPointers := make([]*hcn.HostComputeEndpoint, 0, len(endpoints)) for k := range endpoints { epPointers = append(epPointers, &endpoints[k]) } return epPointers, nil } func GetUniqueEndpoints(endpoints, endpointsAttached []hcn.HostComputeEndpoint) []hcn.HostComputeEndpoint { // Store IDs of endpoints list in a map for quick lookup idMap := make(map[string]struct{}, len(endpoints)) for i := 0; i < len(endpoints); i++ { ep := endpoints[i] idMap[ep.Id] = struct{}{} } // Add endpointsAttached list endpoints in endpoints list if the endpoint is not in the map for i := 0; i < len(endpointsAttached); i++ { ep := endpointsAttached[i] if _, ok := idMap[ep.Id]; !ok { endpoints = append(endpoints, ep) } } return endpoints } // refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints /* Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint. From NPM logs, it seems that endpoints are updated far earlier (several seconds) before the pod event comes in. What we learn from refreshing endpoints: - an old endpoint doesn't exist anymore - a new endpoint has come up Why not refresh when adding a netpol to all required pods? - It's ok if we try to apply on an endpoint that doesn't exist anymore. - We won't know the pod associated with a new endpoint even if we refresh. Why can we refresh only once before updating all pods in the updatePodCache (see ApplyDataplane)? - Again, it's ok if we try to apply on a non-existent endpoint. - We won't miss the endpoint (see the assumption). At the time the pod event came in (when AddToSets/RemoveFromSets were called), HNS already knew about the endpoint. */ func (dp *DataPlane) refreshPodEndpoints() error { endpoints, err := dp.getLocalPodEndpoints() if err != nil { return err } // lock the endpoint cache while we reconcile with HNS goal state dp.endpointCache.Lock() defer dp.endpointCache.Unlock() existingIPs := make(map[string]struct{}) for _, endpoint := range endpoints { if len(endpoint.IpConfigurations) == 0 { klog.Infof("Endpoint ID %s has no IPAddreses", endpoint.Id) continue } ip := endpoint.IpConfigurations[0].IpAddress if ip == "" { klog.Infof("Endpoint ID %s has empty IPAddress field", endpoint.Id) continue } existingIPs[ip] = struct{}{} oldNPMEP, ok := dp.endpointCache.cache[ip] if !ok { // add the endpoint to the cache if it's not already there npmEP := newNPMEndpoint(endpoint) dp.endpointCache.cache[ip] = npmEP // NOTE: TSGs rely on this log line klog.Infof("updating endpoint cache to include %s: %+v", npmEP.ip, npmEP) if dp.NetworkName == util.CalicoNetworkName { // NOTE 1: connectivity may be broken for an endpoint until this method is called // NOTE 2: if NPM restarted, technically we could call into HNS to add the base ACLs even if they already exist on the Endpoint. // It doesn't seem worthwhile to account for these edge-cases since using calico network is currently intended just for testing klog.Infof("adding base ACLs for calico CNI endpoint. IP: %s. ID: %s", ip, npmEP.id) dp.policyMgr.AddBaseACLsForCalicoCNI(npmEP.id) } } else if oldNPMEP.id != endpoint.Id { // multiple endpoints can have the same IP address, but there should be one endpoint ID per pod // throw away old endpoints that have the same IP as a current endpoint (the old endpoint is getting deleted) // we don't have to worry about cleaning up network policies on endpoints that are getting deleted npmEP := newNPMEndpoint(endpoint) klog.Infof("[DataPlane] updating endpoint cache for IP with a new endpoint. old endpoint: %+v. new endpoint: %+v", oldNPMEP, npmEP) dp.endpointCache.cache[ip] = npmEP if dp.NetworkName == util.CalicoNetworkName { // NOTE 1: connectivity may be broken for an endpoint until this method is called // NOTE 2: if NPM restarted, technically we could call into HNS to add the base ACLs even if they already exist on the Endpoint. // It doesn't seem worthwhile to account for these edge-cases since using calico network is currently intended just for testing klog.Infof("adding base ACLs for calico CNI endpoint. IP: %s. ID: %s", ip, npmEP.id) dp.policyMgr.AddBaseACLsForCalicoCNI(npmEP.id) } } } // garbage collection for the endpoint cache for ip, ep := range dp.endpointCache.cache { if _, ok := existingIPs[ip]; !ok { klog.Infof("[DataPlane] deleting endpoint from cache. endpoint: %+v", ep) delete(dp.endpointCache.cache, ip) } } return nil } func (dp *DataPlane) setNetworkIDByName(networkName string) error { // Get Network ID timer := metrics.StartNewTimer() network, err := dp.ioShim.Hns.GetNetworkByName(networkName) metrics.RecordGetNetworkLatency(timer) if err != nil { metrics.IncGetNetworkFailures() return err } dp.networkID = network.Id return nil } func isNetworkNotFoundErr(err error) bool { return strings.Contains(err.Error(), fmt.Sprintf("Network name %q not found", util.AzureNetworkName)) || strings.Contains(err.Error(), fmt.Sprintf("Network name %q not found", util.CalicoNetworkName)) }