func()

in pkg/proxy/nftables/proxier.go [926:1594]


func (proxier *Proxier) syncProxyRules() {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()

	// don't sync rules till we've received services and endpoints
	if !proxier.isInitialized() {
		klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master")
		return
	}

	//
	// Below this point we will not return until we try to write the nftables rules.
	//

	// Keep track of how long syncs take.
	start := time.Now()
	defer func() {
		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
		klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
	}()

	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

	klog.V(2).InfoS("Syncing nftables rules")

	success := false
	defer func() {
		if !success {
			klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
			proxier.syncRunner.RetryAfter(proxier.syncPeriod)
		}
	}()

	// If there are sufficiently-stale chains left over from previous transactions,
	// try to delete them now.
	if len(proxier.staleChains) > 0 {
		oneSecondAgo := start.Add(-time.Second)
		tx := proxier.nftables.NewTransaction()
		deleted := 0
		for chain, modtime := range proxier.staleChains {
			if modtime.Before(oneSecondAgo) {
				tx.Delete(&knftables.Chain{
					Name: chain,
				})
				delete(proxier.staleChains, chain)
				deleted++
			}
		}
		if deleted > 0 {
			klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
			err := proxier.nftables.Run(context.TODO(), tx)
			if err != nil {
				// We already deleted the entries from staleChains, but if
				// the chains still exist, they'll just get added back
				// (with a later timestamp) at the end of the sync.
				klog.ErrorS(err, "Unable to delete stale chains; will retry later")
				// FIXME: metric
			}
		}
	}

	// Now start the actual syncing transaction
	tx := proxier.nftables.NewTransaction()
	proxier.setupNFTables(tx)

	// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
	ipX := "ip"
	ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value
	if proxier.ipFamily == v1.IPv6Protocol {
		ipX = "ip6"
		ipvX_addr = "ipv6_addr"
	}

	// We currently fully-rebuild our sets and maps on each resync
	tx.Flush(&knftables.Set{
		Name: kubeFirewallSet,
	})
	tx.Flush(&knftables.Set{
		Name: kubeFirewallAllowSet,
	})
	tx.Flush(&knftables.Map{
		Name: kubeNoEndpointServicesMap,
	})
	tx.Flush(&knftables.Map{
		Name: kubeNoEndpointNodePortsMap,
	})
	tx.Flush(&knftables.Map{
		Name: kubeServiceIPsMap,
	})
	tx.Flush(&knftables.Map{
		Name: kubeServiceNodePortsMap,
	})

	// Accumulate service/endpoint chains and affinity sets to keep.
	activeChains := sets.New[string]()
	activeAffinitySets := sets.New[string]()

	// Compute total number of endpoint chains across all services
	// to get a sense of how big the cluster is.
	totalEndpoints := 0
	for svcName := range proxier.svcPortMap {
		totalEndpoints += len(proxier.endpointsMap[svcName])
	}

	// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
	// metric.
	serviceNoLocalEndpointsTotalInternal := 0
	serviceNoLocalEndpointsTotalExternal := 0

	// Build rules for each service-port.
	for svcName, svc := range proxier.svcPortMap {
		svcInfo, ok := svc.(*servicePortInfo)
		if !ok {
			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
			continue
		}
		protocol := strings.ToLower(string(svcInfo.Protocol()))
		svcPortNameString := svcInfo.nameString

		// Figure out the endpoints for Cluster and Local traffic policy.
		// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
		// from this node, given the service's traffic policies. hasEndpoints is true
		// if the service has any usable endpoints on any node, not just this one.
		allEndpoints := proxier.endpointsMap[svcName]
		clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)

		// Note the endpoint chains that will be used
		for _, ep := range allLocallyReachableEndpoints {
			if epInfo, ok := ep.(*endpointInfo); ok {
				ensureChain(epInfo.chainName, tx, activeChains)
			}
		}

		// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
		clusterPolicyChain := svcInfo.clusterPolicyChainName
		usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
		if usesClusterPolicyChain {
			ensureChain(clusterPolicyChain, tx, activeChains)
		}

		// localPolicyChain contains the endpoints used with "Local" traffic policy
		localPolicyChain := svcInfo.localPolicyChainName
		usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
		if usesLocalPolicyChain {
			ensureChain(localPolicyChain, tx, activeChains)
		}

		// internalPolicyChain is the chain containing the endpoints for
		// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
		// internal traffic is routed to (which is always the same as
		// internalPolicyChain). hasInternalEndpoints is true if we should
		// generate rules pointing to internalTrafficChain, or false if there are
		// no available internal endpoints.
		internalPolicyChain := clusterPolicyChain
		hasInternalEndpoints := hasEndpoints
		if svcInfo.InternalPolicyLocal() {
			internalPolicyChain = localPolicyChain
			if len(localEndpoints) == 0 {
				hasInternalEndpoints = false
			}
		}
		internalTrafficChain := internalPolicyChain

		// Similarly, externalPolicyChain is the chain containing the endpoints
		// for "external" (NodePort, LoadBalancer, and ExternalIP) traffic.
		// externalTrafficChain is the chain that external traffic is routed to
		// (which is always the service's "EXT" chain). hasExternalEndpoints is
		// true if there are endpoints that will be reached by external traffic.
		// (But we may still have to generate externalTrafficChain even if there
		// are no external endpoints, to ensure that the short-circuit rules for
		// local traffic are set up.)
		externalPolicyChain := clusterPolicyChain
		hasExternalEndpoints := hasEndpoints
		if svcInfo.ExternalPolicyLocal() {
			externalPolicyChain = localPolicyChain
			if len(localEndpoints) == 0 {
				hasExternalEndpoints = false
			}
		}
		externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain

		// usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints,
		// because we need the local-traffic-short-circuiting rules even when there
		// are no externally-usable endpoints.
		usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
		if usesExternalTrafficChain {
			ensureChain(externalTrafficChain, tx, activeChains)
		}

		var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
		if !hasEndpoints {
			// The service has no endpoints at all; hasInternalEndpoints and
			// hasExternalEndpoints will also be false, and we will not
			// generate any chains in the "nat" table for the service; only
			// rules in the "filter" table rejecting incoming packets for
			// the service's IPs.
			internalTrafficFilterVerdict = fmt.Sprintf("goto %s", kubeRejectChain)
			externalTrafficFilterVerdict = fmt.Sprintf("goto %s", kubeRejectChain)
		} else {
			if !hasInternalEndpoints {
				// The internalTrafficPolicy is "Local" but there are no local
				// endpoints. Traffic to the clusterIP will be dropped, but
				// external traffic may still be accepted.
				internalTrafficFilterVerdict = "drop"
				serviceNoLocalEndpointsTotalInternal++
			}
			if !hasExternalEndpoints {
				// The externalTrafficPolicy is "Local" but there are no
				// local endpoints. Traffic to "external" IPs from outside
				// the cluster will be dropped, but traffic from inside
				// the cluster may still be accepted.
				externalTrafficFilterVerdict = "drop"
				serviceNoLocalEndpointsTotalExternal++
			}
		}

		// Capture the clusterIP.
		if hasInternalEndpoints {
			tx.Add(&knftables.Element{
				Map: kubeServiceIPsMap,
				Key: []string{
					svcInfo.ClusterIP().String(),
					protocol,
					strconv.Itoa(svcInfo.Port()),
				},
				Value: []string{
					fmt.Sprintf("goto %s", internalTrafficChain),
				},
			})
		} else {
			// No endpoints.
			tx.Add(&knftables.Element{
				Map: kubeNoEndpointServicesMap,
				Key: []string{
					svcInfo.ClusterIP().String(),
					protocol,
					strconv.Itoa(svcInfo.Port()),
				},
				Value: []string{
					internalTrafficFilterVerdict,
				},
				Comment: &svcPortNameString,
			})
		}

		// Capture externalIPs.
		for _, externalIP := range svcInfo.ExternalIPStrings() {
			if hasEndpoints {
				// Send traffic bound for external IPs to the "external
				// destinations" chain.
				tx.Add(&knftables.Element{
					Map: kubeServiceIPsMap,
					Key: []string{
						externalIP,
						protocol,
						strconv.Itoa(svcInfo.Port()),
					},
					Value: []string{
						fmt.Sprintf("goto %s", externalTrafficChain),
					},
				})
			}
			if !hasExternalEndpoints {
				// Either no endpoints at all (REJECT) or no endpoints for
				// external traffic (DROP anything that didn't get
				// short-circuited by the EXT chain.)
				tx.Add(&knftables.Element{
					Map: kubeNoEndpointServicesMap,
					Key: []string{
						externalIP,
						protocol,
						strconv.Itoa(svcInfo.Port()),
					},
					Value: []string{
						externalTrafficFilterVerdict,
					},
					Comment: &svcPortNameString,
				})
			}
		}

		// Capture load-balancer ingress.
		for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
			if hasEndpoints {
				tx.Add(&knftables.Element{
					Map: kubeServiceIPsMap,
					Key: []string{
						lbip,
						protocol,
						strconv.Itoa(svcInfo.Port()),
					},
					Value: []string{
						fmt.Sprintf("goto %s", externalTrafficChain),
					},
				})
			}

			if len(svcInfo.LoadBalancerSourceRanges()) > 0 {
				tx.Add(&knftables.Element{
					Set: kubeFirewallSet,
					Key: []string{
						lbip,
						protocol,
						strconv.Itoa(svcInfo.Port()),
					},
					Comment: &svcPortNameString,
				})

				allowFromNode := false
				for _, src := range svcInfo.LoadBalancerSourceRanges() {
					_, cidr, _ := netutils.ParseCIDRSloppy(src)
					if cidr == nil {
						continue
					}
					tx.Add(&knftables.Element{
						Set: kubeFirewallAllowSet,
						Key: []string{
							lbip,
							protocol,
							strconv.Itoa(svcInfo.Port()),
							src,
						},
						Comment: &svcPortNameString,
					})
					if cidr.Contains(proxier.nodeIP) {
						allowFromNode = true
					}
				}
				// For VIP-like LBs, the VIP is often added as a local
				// address (via an IP route rule).  In that case, a request
				// from a node to the VIP will not hit the loadbalancer but
				// will loop back with the source IP set to the VIP.  We
				// need the following rules to allow requests from this node.
				if allowFromNode {
					tx.Add(&knftables.Element{
						Set: kubeFirewallAllowSet,
						Key: []string{
							lbip,
							protocol,
							strconv.Itoa(svcInfo.Port()),
							lbip,
						},
					})
				}
			}
		}
		if !hasExternalEndpoints {
			// Either no endpoints at all (REJECT) or no endpoints for
			// external traffic (DROP anything that didn't get short-circuited
			// by the EXT chain.)
			for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
				tx.Add(&knftables.Element{
					Map: kubeNoEndpointServicesMap,
					Key: []string{
						lbip,
						protocol,
						strconv.Itoa(svcInfo.Port()),
					},
					Value: []string{
						externalTrafficFilterVerdict,
					},
					Comment: &svcPortNameString,
				})
			}
		}

		// Capture nodeports.
		if svcInfo.NodePort() != 0 {
			if hasEndpoints {
				// Jump to the external destination chain.  For better or for
				// worse, nodeports are not subect to loadBalancerSourceRanges,
				// and we can't change that.
				tx.Add(&knftables.Element{
					Map: kubeServiceNodePortsMap,
					Key: []string{
						protocol,
						strconv.Itoa(svcInfo.NodePort()),
					},
					Value: []string{
						fmt.Sprintf("goto %s", externalTrafficChain),
					},
				})
			}
			if !hasExternalEndpoints {
				// Either no endpoints at all (REJECT) or no endpoints for
				// external traffic (DROP anything that didn't get
				// short-circuited by the EXT chain.)
				tx.Add(&knftables.Element{
					Map: kubeNoEndpointNodePortsMap,
					Key: []string{
						protocol,
						strconv.Itoa(svcInfo.NodePort()),
					},
					Value: []string{
						externalTrafficFilterVerdict,
					},
					Comment: &svcPortNameString,
				})
			}
		}

		// Set up internal traffic handling.
		if hasInternalEndpoints {
			if proxier.masqueradeAll {
				tx.Add(&knftables.Rule{
					Chain: internalTrafficChain,
					Rule: knftables.Concat(
						ipX, "daddr", svcInfo.ClusterIP(),
						protocol, "dport", svcInfo.Port(),
						"jump", kubeMarkMasqChain,
					),
				})
			} else if proxier.localDetector.IsImplemented() {
				// This masquerades off-cluster traffic to a service VIP. The
				// idea is that you can establish a static route for your
				// Service range, routing to any node, and that node will
				// bridge into the Service for you. Since that might bounce
				// off-node, we masquerade here.
				tx.Add(&knftables.Rule{
					Chain: internalTrafficChain,
					Rule: knftables.Concat(
						ipX, "daddr", svcInfo.ClusterIP(),
						protocol, "dport", svcInfo.Port(),
						proxier.localDetector.IfNotLocalNFT(),
						"jump", kubeMarkMasqChain,
					),
				})
			}
		}

		// Set up external traffic handling (if any "external" destinations are
		// enabled). All captured traffic for all external destinations should
		// jump to externalTrafficChain, which will handle some special cases and
		// then jump to externalPolicyChain.
		if usesExternalTrafficChain {
			if !svcInfo.ExternalPolicyLocal() {
				// If we are using non-local endpoints we need to masquerade,
				// in case we cross nodes.
				tx.Add(&knftables.Rule{
					Chain: externalTrafficChain,
					Rule: knftables.Concat(
						"jump", kubeMarkMasqChain,
					),
				})
			} else {
				// If we are only using same-node endpoints, we can retain the
				// source IP in most cases.

				if proxier.localDetector.IsImplemented() {
					// Treat all locally-originated pod -> external destination
					// traffic as a special-case.  It is subject to neither
					// form of traffic policy, which simulates going up-and-out
					// to an external load-balancer and coming back in.
					tx.Add(&knftables.Rule{
						Chain: externalTrafficChain,
						Rule: knftables.Concat(
							proxier.localDetector.IfLocalNFT(),
							"goto", clusterPolicyChain,
						),
						Comment: ptr.To("short-circuit pod traffic"),
					})
				}

				// Locally originated traffic (not a pod, but the host node)
				// still needs masquerade because the LBIP itself is a local
				// address, so that will be the chosen source IP.
				tx.Add(&knftables.Rule{
					Chain: externalTrafficChain,
					Rule: knftables.Concat(
						"fib", "saddr", "type", "local",
						"jump", kubeMarkMasqChain,
					),
					Comment: ptr.To("masquerade local traffic"),
				})

				// Redirect all src-type=LOCAL -> external destination to the
				// policy=cluster chain. This allows traffic originating
				// from the host to be redirected to the service correctly.
				tx.Add(&knftables.Rule{
					Chain: externalTrafficChain,
					Rule: knftables.Concat(
						"fib", "saddr", "type", "local",
						"goto", clusterPolicyChain,
					),
					Comment: ptr.To("short-circuit local traffic"),
				})
			}

			// Anything else falls thru to the appropriate policy chain.
			if hasExternalEndpoints {
				tx.Add(&knftables.Rule{
					Chain: externalTrafficChain,
					Rule: knftables.Concat(
						"goto", externalPolicyChain,
					),
				})
			}
		}

		if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
			// Generate the per-endpoint affinity sets
			for _, ep := range allLocallyReachableEndpoints {
				epInfo, ok := ep.(*endpointInfo)
				if !ok {
					klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
					continue
				}

				// Create a set to store current affinity mappings. As
				// with the iptables backend, endpoint affinity is
				// recorded for connections from a particular source IP
				// (without regard to source port) to a particular
				// ServicePort (without regard to which service IP was
				// used to reach the service). This may be changed in the
				// future.
				tx.Add(&knftables.Set{
					Name: epInfo.affinitySetName,
					Type: ipvX_addr,
					Flags: []knftables.SetFlag{
						// The nft docs say "dynamic" is only
						// needed for sets containing stateful
						// objects (eg counters), but (at least on
						// RHEL8) if we create the set without
						// "dynamic", it later gets mutated to
						// have it, and then the next attempt to
						// tx.Add() it here fails because it looks
						// like we're trying to change the flags.
						knftables.DynamicFlag,
						knftables.TimeoutFlag,
					},
					Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
				})
				activeAffinitySets.Insert(epInfo.affinitySetName)
			}
		}

		// If Cluster policy is in use, create the chain and create rules jumping
		// from clusterPolicyChain to the clusterEndpoints
		if usesClusterPolicyChain {
			proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
		}

		// If Local policy is in use, create rules jumping from localPolicyChain
		// to the localEndpoints
		if usesLocalPolicyChain {
			proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
		}

		// Generate the per-endpoint chains
		for _, ep := range allLocallyReachableEndpoints {
			epInfo, ok := ep.(*endpointInfo)
			if !ok {
				klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
				continue
			}

			endpointChain := epInfo.chainName

			// Handle traffic that loops back to the originator with SNAT.
			tx.Add(&knftables.Rule{
				Chain: endpointChain,
				Rule: knftables.Concat(
					ipX, "saddr", epInfo.IP(),
					"jump", kubeMarkMasqChain,
				),
			})

			// Handle session affinity
			if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
				tx.Add(&knftables.Rule{
					Chain: endpointChain,
					Rule: knftables.Concat(
						"update", "@", epInfo.affinitySetName,
						"{", ipX, "saddr", "}",
					),
				})
			}

			// DNAT to final destination.
			tx.Add(&knftables.Rule{
				Chain: endpointChain,
				Rule: knftables.Concat(
					"meta l4proto", protocol,
					"dnat to", epInfo.String(),
				),
			})
		}
	}

	// Figure out which chains are now stale. Unfortunately, we can't delete them
	// right away, because with kernels before 6.2, if there is a map element pointing
	// to a chain, and you delete that map element, the kernel doesn't notice until a
	// short amount of time later that the chain is now unreferenced. So we flush them
	// now, and record the time that they become stale in staleChains so they can be
	// deleted later.
	existingChains, err := proxier.nftables.List(context.TODO(), "chains")
	if err == nil {
		for _, chain := range existingChains {
			if isServiceChainName(chain) && !activeChains.Has(chain) {
				tx.Flush(&knftables.Chain{
					Name: chain,
				})
				proxier.staleChains[chain] = start
			}
		}
	} else if !knftables.IsNotFound(err) {
		klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
	}

	// OTOH, we can immediately delete any stale affinity sets
	existingSets, err := proxier.nftables.List(context.TODO(), "sets")
	if err == nil {
		for _, set := range existingSets {
			if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
				tx.Delete(&knftables.Set{
					Name: set,
				})
			}
		}
	} else if !knftables.IsNotFound(err) {
		klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
	}

	// Sync rules.
	klog.V(2).InfoS("Reloading service nftables data",
		"numServices", len(proxier.svcPortMap),
		"numEndpoints", totalEndpoints,
	)

	// FIXME
	// klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes())

	err = proxier.nftables.Run(context.TODO(), tx)
	if err != nil {
		klog.ErrorS(err, "nftables sync failed")
		metrics.IptablesRestoreFailuresTotal.Inc()
		return
	}
	success = true

	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
			metrics.NetworkProgrammingLatency.Observe(latency)
			klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
		}
	}

	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
	metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
	if proxier.healthzServer != nil {
		proxier.healthzServer.Updated(proxier.ipFamily)
	}
	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

	// Update service healthchecks.  The endpoints list might include services that are
	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
	// will just drop those endpoints.
	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
		klog.ErrorS(err, "Error syncing healthcheck services")
	}
	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
		klog.ErrorS(err, "Error syncing healthcheck endpoints")
	}

	// Finish housekeeping, clear stale conntrack entries for UDP Services
	conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}