func()

in pkg/proxy/ipvs/proxier.go [895:1487]


func (proxier *Proxier) syncProxyRules() {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()

	// don't sync rules till we've received services and endpoints
	if !proxier.isInitialized() {
		klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
		return
	}

	// its safe to set initialSync to false as it acts as a flag for startup actions
	// and the mutex is held.
	defer func() {
		proxier.initialSync = false
	}()

	// Keep track of how long syncs take.
	start := time.Now()
	defer func() {
		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
		klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
	}()

	// We assume that if this was called, we really want to sync them,
	// even if nothing changed in the meantime. In other words, callers are
	// responsible for detecting no-op changes and not calling this function.
	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

	klog.V(3).InfoS("Syncing ipvs proxier rules")

	proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
	proxier.serviceNoLocalEndpointsExternal = sets.New[string]()

	proxier.lbNoNodeAccessIPPortProtocolEntries = make([]*utilipset.Entry, 0)

	// Begin install iptables

	// Reset all buffers used later.
	// This is to avoid memory reallocations and thus improve performance.
	proxier.natChains.Reset()
	proxier.natRules.Reset()
	proxier.filterChains.Reset()
	proxier.filterRules.Reset()

	// Write table headers.
	proxier.filterChains.Write("*filter")
	proxier.natChains.Write("*nat")

	proxier.createAndLinkKubeChain()

	// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
	_, err := proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
	if err != nil {
		klog.ErrorS(err, "Failed to create dummy interface", "interface", defaultDummyDevice)
		return
	}

	// make sure ip sets exists in the system.
	for _, set := range proxier.ipsetList {
		if err := ensureIPSet(set); err != nil {
			return
		}
		set.resetEntries()
	}

	// activeIPVSServices represents IPVS service successfully created in this round of sync
	activeIPVSServices := sets.New[string]()
	// activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync
	activeBindAddrs := sets.New[string]()
	// alreadyBoundAddrs Represents addresses currently assigned to the dummy interface
	alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
	if err != nil {
		klog.ErrorS(err, "Error listing addresses binded to dummy interface")
	}
	// nodeAddressSet All addresses *except* those on the dummy interface
	nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddressesExcept(defaultDummyDevice)
	if err != nil {
		klog.ErrorS(err, "Error listing node addresses")
	}

	hasNodePort := false
	for _, svc := range proxier.svcPortMap {
		svcInfo, ok := svc.(*servicePortInfo)
		if ok && svcInfo.NodePort() != 0 {
			hasNodePort = true
			break
		}
	}

	// List of node IP addresses to be used as IPVS services if nodePort is set. This
	// can be reused for all nodePort services.
	var nodeIPs []net.IP
	if hasNodePort {
		if proxier.nodePortAddresses.MatchAll() {
			for _, ipStr := range nodeAddressSet.UnsortedList() {
				nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
			}
		} else {
			allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
			if err != nil {
				klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
			} else {
				for _, ip := range allNodeIPs {
					if !ip.IsLoopback() {
						nodeIPs = append(nodeIPs, ip)
					}
				}
			}
		}
	}

	// Build IPVS rules for each service.
	for svcPortName, svcPort := range proxier.svcPortMap {
		svcInfo, ok := svcPort.(*servicePortInfo)
		if !ok {
			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
			continue
		}

		protocol := strings.ToLower(string(svcInfo.Protocol()))
		// Precompute svcNameString; with many services the many calls
		// to ServicePortName.String() show up in CPU profiles.
		svcPortNameString := svcPortName.String()

		// Handle traffic that loops back to the originator with SNAT.
		for _, e := range proxier.endpointsMap[svcPortName] {
			ep, ok := e.(*proxy.BaseEndpointInfo)
			if !ok {
				klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
				continue
			}
			if !ep.IsLocal() {
				continue
			}
			epIP := ep.IP()
			epPort := ep.Port()
			// Error parsing this endpoint has been logged. Skip to next endpoint.
			if epIP == "" || epPort == 0 {
				continue
			}
			entry := &utilipset.Entry{
				IP:       epIP,
				Port:     epPort,
				Protocol: protocol,
				IP2:      epIP,
				SetType:  utilipset.HashIPPortIP,
			}
			if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
				continue
			}
			proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
		}

		// Capture the clusterIP.
		// ipset call
		entry := &utilipset.Entry{
			IP:       svcInfo.ClusterIP().String(),
			Port:     svcInfo.Port(),
			Protocol: protocol,
			SetType:  utilipset.HashIPPort,
		}
		// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
		// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
		if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
			klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
			continue
		}
		proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
		// ipvs call
		serv := &utilipvs.VirtualServer{
			Address:   svcInfo.ClusterIP(),
			Port:      uint16(svcInfo.Port()),
			Protocol:  string(svcInfo.Protocol()),
			Scheduler: proxier.ipvsScheduler,
		}
		// Set session affinity flag and timeout for IPVS service
		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
			serv.Flags |= utilipvs.FlagPersistent
			serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
		}
		// Set the source hash flag needed for the distribution method "mh"
		if proxier.ipvsScheduler == "mh" {
			serv.Flags |= utilipvs.FlagSourceHash
		}
		// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
		if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
			activeIPVSServices.Insert(serv.String())
			activeBindAddrs.Insert(serv.Address.String())
			// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
			// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
			internalNodeLocal := false
			if svcInfo.InternalPolicyLocal() {
				internalNodeLocal = true
			}
			if err := proxier.syncEndpoint(svcPortName, internalNodeLocal, serv); err != nil {
				klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
			}
		} else {
			klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
		}

		// Capture externalIPs.
		for _, externalIP := range svcInfo.ExternalIPStrings() {
			// ipset call
			entry := &utilipset.Entry{
				IP:       externalIP,
				Port:     svcInfo.Port(),
				Protocol: protocol,
				SetType:  utilipset.HashIPPort,
			}

			if svcInfo.ExternalPolicyLocal() {
				if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
					continue
				}
				proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
			} else {
				// We have to SNAT packets to external IPs.
				if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
					continue
				}
				proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
			}

			// ipvs call
			serv := &utilipvs.VirtualServer{
				Address:   netutils.ParseIPSloppy(externalIP),
				Port:      uint16(svcInfo.Port()),
				Protocol:  string(svcInfo.Protocol()),
				Scheduler: proxier.ipvsScheduler,
			}
			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
				serv.Flags |= utilipvs.FlagPersistent
				serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
			}
			// Set the source hash flag needed for the distribution method "mh"
			if proxier.ipvsScheduler == "mh" {
				serv.Flags |= utilipvs.FlagSourceHash
			}
			// We must not add the address to the dummy device if it exist on another interface
			shouldBind := !nodeAddressSet.Has(serv.Address.String())
			if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
				activeIPVSServices.Insert(serv.String())
				if shouldBind {
					activeBindAddrs.Insert(serv.Address.String())
				}
				if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
					klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
				}
			} else {
				klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
			}
		}

		// Capture load-balancer ingress.
		for _, ingress := range svcInfo.LoadBalancerVIPStrings() {
			// ipset call
			entry = &utilipset.Entry{
				IP:       ingress,
				Port:     svcInfo.Port(),
				Protocol: protocol,
				SetType:  utilipset.HashIPPort,
			}
			// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
			// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
			// If we are proxying globally, we need to masquerade in case we cross nodes.
			// If we are proxying only locally, we can retain the source IP.
			if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
				continue
			}
			proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
			// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
			if svcInfo.ExternalPolicyLocal() {
				if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
					continue
				}
				proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
			}
			if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
				// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
				// This currently works for loadbalancers that preserves source ips.
				// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
				if valid := proxier.ipsetList[kubeLoadBalancerFWSet].validateEntry(entry); !valid {
					klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerFWSet].Name)
					continue
				}
				proxier.ipsetList[kubeLoadBalancerFWSet].activeEntries.Insert(entry.String())
				allowFromNode := false
				for _, src := range svcInfo.LoadBalancerSourceRanges() {
					// ipset call
					entry = &utilipset.Entry{
						IP:       ingress,
						Port:     svcInfo.Port(),
						Protocol: protocol,
						Net:      src,
						SetType:  utilipset.HashIPPortNet,
					}
					// enumerate all white list source cidr
					if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
						continue
					}
					proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())

					// ignore error because it has been validated
					_, cidr, _ := netutils.ParseCIDRSloppy(src)
					if cidr.Contains(proxier.nodeIP) {
						allowFromNode = true
					}
				}
				// generally, ip route rule was added to intercept request to loadbalancer vip from the
				// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
				// Need to add the following rule to allow request on host.
				if allowFromNode {
					entry = &utilipset.Entry{
						IP:       ingress,
						Port:     svcInfo.Port(),
						Protocol: protocol,
						IP2:      ingress,
						SetType:  utilipset.HashIPPortIP,
					}
					// enumerate all white list source ip
					if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
						continue
					}
					proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
				} else {
					// since nodeIP is not covered in any of SourceRange we need to explicitly block the lbIP access from k8s nodes.
					proxier.lbNoNodeAccessIPPortProtocolEntries = append(proxier.lbNoNodeAccessIPPortProtocolEntries, entry)

				}
			}
			// ipvs call
			serv := &utilipvs.VirtualServer{
				Address:   netutils.ParseIPSloppy(ingress),
				Port:      uint16(svcInfo.Port()),
				Protocol:  string(svcInfo.Protocol()),
				Scheduler: proxier.ipvsScheduler,
			}
			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
				serv.Flags |= utilipvs.FlagPersistent
				serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
			}
			// Set the source hash flag needed for the distribution method "mh"
			if proxier.ipvsScheduler == "mh" {
				serv.Flags |= utilipvs.FlagSourceHash
			}
			// We must not add the address to the dummy device if it exist on another interface
			shouldBind := !nodeAddressSet.Has(serv.Address.String())
			if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
				activeIPVSServices.Insert(serv.String())
				if shouldBind {
					activeBindAddrs.Insert(serv.Address.String())
				}
				if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
					klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
				}
			} else {
				klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
			}
		}

		if svcInfo.NodePort() != 0 {
			if len(nodeIPs) == 0 {
				// Skip nodePort configuration since an error occurred when
				// computing nodeAddresses or nodeIPs.
				continue
			}

			// Nodeports need SNAT, unless they're local.
			// ipset call

			var (
				nodePortSet *IPSet
				entries     []*utilipset.Entry
			)

			switch protocol {
			case utilipset.ProtocolTCP:
				nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
				entries = []*utilipset.Entry{{
					// No need to provide ip info
					Port:     svcInfo.NodePort(),
					Protocol: protocol,
					SetType:  utilipset.BitmapPort,
				}}
			case utilipset.ProtocolUDP:
				nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
				entries = []*utilipset.Entry{{
					// No need to provide ip info
					Port:     svcInfo.NodePort(),
					Protocol: protocol,
					SetType:  utilipset.BitmapPort,
				}}
			case utilipset.ProtocolSCTP:
				nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
				// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
				entries = []*utilipset.Entry{}
				for _, nodeIP := range nodeIPs {
					entries = append(entries, &utilipset.Entry{
						IP:       nodeIP.String(),
						Port:     svcInfo.NodePort(),
						Protocol: protocol,
						SetType:  utilipset.HashIPPort,
					})
				}
			default:
				// It should never hit
				klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
			}
			if nodePortSet != nil {
				entryInvalidErr := false
				for _, entry := range entries {
					if valid := nodePortSet.validateEntry(entry); !valid {
						klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
						entryInvalidErr = true
						break
					}
					nodePortSet.activeEntries.Insert(entry.String())
				}
				if entryInvalidErr {
					continue
				}
			}

			// Add externaltrafficpolicy=local type nodeport entry
			if svcInfo.ExternalPolicyLocal() {
				var nodePortLocalSet *IPSet
				switch protocol {
				case utilipset.ProtocolTCP:
					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
				case utilipset.ProtocolUDP:
					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
				case utilipset.ProtocolSCTP:
					nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
				default:
					// It should never hit
					klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
				}
				if nodePortLocalSet != nil {
					entryInvalidErr := false
					for _, entry := range entries {
						if valid := nodePortLocalSet.validateEntry(entry); !valid {
							klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortLocalSet.Name)
							entryInvalidErr = true
							break
						}
						nodePortLocalSet.activeEntries.Insert(entry.String())
					}
					if entryInvalidErr {
						continue
					}
				}
			}

			// Build ipvs kernel routes for each node ip address
			for _, nodeIP := range nodeIPs {
				// ipvs call
				serv := &utilipvs.VirtualServer{
					Address:   nodeIP,
					Port:      uint16(svcInfo.NodePort()),
					Protocol:  string(svcInfo.Protocol()),
					Scheduler: proxier.ipvsScheduler,
				}
				if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
					serv.Flags |= utilipvs.FlagPersistent
					serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
				}
				// Set the source hash flag needed for the distribution method "mh"
				if proxier.ipvsScheduler == "mh" {
					serv.Flags |= utilipvs.FlagSourceHash
				}
				// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
				if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
					activeIPVSServices.Insert(serv.String())
					if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
						klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
					}
				} else {
					klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
				}
			}
		}

		if svcInfo.HealthCheckNodePort() != 0 {
			nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
			entry := &utilipset.Entry{
				// No need to provide ip info
				Port:     svcInfo.HealthCheckNodePort(),
				Protocol: "tcp",
				SetType:  utilipset.BitmapPort,
			}

			if valid := nodePortSet.validateEntry(entry); !valid {
				klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
				continue
			}
			nodePortSet.activeEntries.Insert(entry.String())
		}
	}

	// Set the KUBE-IPVS-IPS set to the "activeBindAddrs"
	proxier.ipsetList[kubeIPVSSet].activeEntries = activeBindAddrs

	// sync ipset entries
	for _, set := range proxier.ipsetList {
		set.syncIPSetEntries()
	}

	// Tail call iptables rules for ipset, make sure only call iptables once
	// in a single loop per ip set.
	proxier.writeIptablesRules()

	// Sync iptables rules.
	// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
	proxier.iptablesData.Reset()
	proxier.iptablesData.Write(proxier.natChains.Bytes())
	proxier.iptablesData.Write(proxier.natRules.Bytes())
	proxier.iptablesData.Write(proxier.filterChains.Bytes())
	proxier.iptablesData.Write(proxier.filterRules.Bytes())

	klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
	if err != nil {
		if pErr, ok := err.(utiliptables.ParseError); ok {
			lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
			klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
		} else {
			klog.ErrorS(err, "Failed to execute iptables-restore", "rules", proxier.iptablesData.Bytes())
		}
		metrics.IptablesRestoreFailuresTotal.Inc()
		return
	}
	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
			metrics.NetworkProgrammingLatency.Observe(latency)
			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
		}
	}

	// Remove superfluous addresses from the dummy device
	superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
	if superfluousAddresses.Len() > 0 {
		klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
		for adr := range superfluousAddresses {
			if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
				klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
			}
		}
	}

	// currentIPVSServices represent IPVS services listed from the system
	// (including any we have created in this sync)
	currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
	appliedSvcs, err := proxier.ipvs.GetVirtualServers()
	if err == nil {
		for _, appliedSvc := range appliedSvcs {
			currentIPVSServices[appliedSvc.String()] = appliedSvc
		}
	} else {
		klog.ErrorS(err, "Failed to get ipvs service")
	}
	proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)

	if proxier.healthzServer != nil {
		proxier.healthzServer.Updated(proxier.ipFamily)
	}
	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

	// Update service healthchecks.  The endpoints list might include services that are
	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
	// will just drop those endpoints.
	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
		klog.ErrorS(err, "Error syncing healthcheck services")
	}
	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
		klog.ErrorS(err, "Error syncing healthcheck endpoints")
	}

	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))

	// Finish housekeeping, clear stale conntrack entries for UDP Services
	conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}