in pkg/proxy/nftables/proxier.go [926:1594]
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master")
return
}
//
// Below this point we will not return until we try to write the nftables rules.
//
// Keep track of how long syncs take.
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
}()
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
klog.V(2).InfoS("Syncing nftables rules")
success := false
defer func() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
}
}()
// If there are sufficiently-stale chains left over from previous transactions,
// try to delete them now.
if len(proxier.staleChains) > 0 {
oneSecondAgo := start.Add(-time.Second)
tx := proxier.nftables.NewTransaction()
deleted := 0
for chain, modtime := range proxier.staleChains {
if modtime.Before(oneSecondAgo) {
tx.Delete(&knftables.Chain{
Name: chain,
})
delete(proxier.staleChains, chain)
deleted++
}
}
if deleted > 0 {
klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
err := proxier.nftables.Run(context.TODO(), tx)
if err != nil {
// We already deleted the entries from staleChains, but if
// the chains still exist, they'll just get added back
// (with a later timestamp) at the end of the sync.
klog.ErrorS(err, "Unable to delete stale chains; will retry later")
// FIXME: metric
}
}
}
// Now start the actual syncing transaction
tx := proxier.nftables.NewTransaction()
proxier.setupNFTables(tx)
// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
ipX := "ip"
ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value
if proxier.ipFamily == v1.IPv6Protocol {
ipX = "ip6"
ipvX_addr = "ipv6_addr"
}
// We currently fully-rebuild our sets and maps on each resync
tx.Flush(&knftables.Set{
Name: kubeFirewallSet,
})
tx.Flush(&knftables.Set{
Name: kubeFirewallAllowSet,
})
tx.Flush(&knftables.Map{
Name: kubeNoEndpointServicesMap,
})
tx.Flush(&knftables.Map{
Name: kubeNoEndpointNodePortsMap,
})
tx.Flush(&knftables.Map{
Name: kubeServiceIPsMap,
})
tx.Flush(&knftables.Map{
Name: kubeServiceNodePortsMap,
})
// Accumulate service/endpoint chains and affinity sets to keep.
activeChains := sets.New[string]()
activeAffinitySets := sets.New[string]()
// Compute total number of endpoint chains across all services
// to get a sense of how big the cluster is.
totalEndpoints := 0
for svcName := range proxier.svcPortMap {
totalEndpoints += len(proxier.endpointsMap[svcName])
}
// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
// metric.
serviceNoLocalEndpointsTotalInternal := 0
serviceNoLocalEndpointsTotalExternal := 0
// Build rules for each service-port.
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcPortNameString := svcInfo.nameString
// Figure out the endpoints for Cluster and Local traffic policy.
// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Note the endpoint chains that will be used
for _, ep := range allLocallyReachableEndpoints {
if epInfo, ok := ep.(*endpointInfo); ok {
ensureChain(epInfo.chainName, tx, activeChains)
}
}
// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
if usesClusterPolicyChain {
ensureChain(clusterPolicyChain, tx, activeChains)
}
// localPolicyChain contains the endpoints used with "Local" traffic policy
localPolicyChain := svcInfo.localPolicyChainName
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
if usesLocalPolicyChain {
ensureChain(localPolicyChain, tx, activeChains)
}
// internalPolicyChain is the chain containing the endpoints for
// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
// internal traffic is routed to (which is always the same as
// internalPolicyChain). hasInternalEndpoints is true if we should
// generate rules pointing to internalTrafficChain, or false if there are
// no available internal endpoints.
internalPolicyChain := clusterPolicyChain
hasInternalEndpoints := hasEndpoints
if svcInfo.InternalPolicyLocal() {
internalPolicyChain = localPolicyChain
if len(localEndpoints) == 0 {
hasInternalEndpoints = false
}
}
internalTrafficChain := internalPolicyChain
// Similarly, externalPolicyChain is the chain containing the endpoints
// for "external" (NodePort, LoadBalancer, and ExternalIP) traffic.
// externalTrafficChain is the chain that external traffic is routed to
// (which is always the service's "EXT" chain). hasExternalEndpoints is
// true if there are endpoints that will be reached by external traffic.
// (But we may still have to generate externalTrafficChain even if there
// are no external endpoints, to ensure that the short-circuit rules for
// local traffic are set up.)
externalPolicyChain := clusterPolicyChain
hasExternalEndpoints := hasEndpoints
if svcInfo.ExternalPolicyLocal() {
externalPolicyChain = localPolicyChain
if len(localEndpoints) == 0 {
hasExternalEndpoints = false
}
}
externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain
// usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints,
// because we need the local-traffic-short-circuiting rules even when there
// are no externally-usable endpoints.
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
if usesExternalTrafficChain {
ensureChain(externalTrafficChain, tx, activeChains)
}
var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
if !hasEndpoints {
// The service has no endpoints at all; hasInternalEndpoints and
// hasExternalEndpoints will also be false, and we will not
// generate any chains in the "nat" table for the service; only
// rules in the "filter" table rejecting incoming packets for
// the service's IPs.
internalTrafficFilterVerdict = fmt.Sprintf("goto %s", kubeRejectChain)
externalTrafficFilterVerdict = fmt.Sprintf("goto %s", kubeRejectChain)
} else {
if !hasInternalEndpoints {
// The internalTrafficPolicy is "Local" but there are no local
// endpoints. Traffic to the clusterIP will be dropped, but
// external traffic may still be accepted.
internalTrafficFilterVerdict = "drop"
serviceNoLocalEndpointsTotalInternal++
}
if !hasExternalEndpoints {
// The externalTrafficPolicy is "Local" but there are no
// local endpoints. Traffic to "external" IPs from outside
// the cluster will be dropped, but traffic from inside
// the cluster may still be accepted.
externalTrafficFilterVerdict = "drop"
serviceNoLocalEndpointsTotalExternal++
}
}
// Capture the clusterIP.
if hasInternalEndpoints {
tx.Add(&knftables.Element{
Map: kubeServiceIPsMap,
Key: []string{
svcInfo.ClusterIP().String(),
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
fmt.Sprintf("goto %s", internalTrafficChain),
},
})
} else {
// No endpoints.
tx.Add(&knftables.Element{
Map: kubeNoEndpointServicesMap,
Key: []string{
svcInfo.ClusterIP().String(),
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
internalTrafficFilterVerdict,
},
Comment: &svcPortNameString,
})
}
// Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPStrings() {
if hasEndpoints {
// Send traffic bound for external IPs to the "external
// destinations" chain.
tx.Add(&knftables.Element{
Map: kubeServiceIPsMap,
Key: []string{
externalIP,
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
fmt.Sprintf("goto %s", externalTrafficChain),
},
})
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.)
tx.Add(&knftables.Element{
Map: kubeNoEndpointServicesMap,
Key: []string{
externalIP,
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
externalTrafficFilterVerdict,
},
Comment: &svcPortNameString,
})
}
}
// Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
if hasEndpoints {
tx.Add(&knftables.Element{
Map: kubeServiceIPsMap,
Key: []string{
lbip,
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
fmt.Sprintf("goto %s", externalTrafficChain),
},
})
}
if len(svcInfo.LoadBalancerSourceRanges()) > 0 {
tx.Add(&knftables.Element{
Set: kubeFirewallSet,
Key: []string{
lbip,
protocol,
strconv.Itoa(svcInfo.Port()),
},
Comment: &svcPortNameString,
})
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() {
_, cidr, _ := netutils.ParseCIDRSloppy(src)
if cidr == nil {
continue
}
tx.Add(&knftables.Element{
Set: kubeFirewallAllowSet,
Key: []string{
lbip,
protocol,
strconv.Itoa(svcInfo.Port()),
src,
},
Comment: &svcPortNameString,
})
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
}
}
// For VIP-like LBs, the VIP is often added as a local
// address (via an IP route rule). In that case, a request
// from a node to the VIP will not hit the loadbalancer but
// will loop back with the source IP set to the VIP. We
// need the following rules to allow requests from this node.
if allowFromNode {
tx.Add(&knftables.Element{
Set: kubeFirewallAllowSet,
Key: []string{
lbip,
protocol,
strconv.Itoa(svcInfo.Port()),
lbip,
},
})
}
}
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
tx.Add(&knftables.Element{
Map: kubeNoEndpointServicesMap,
Key: []string{
lbip,
protocol,
strconv.Itoa(svcInfo.Port()),
},
Value: []string{
externalTrafficFilterVerdict,
},
Comment: &svcPortNameString,
})
}
}
// Capture nodeports.
if svcInfo.NodePort() != 0 {
if hasEndpoints {
// Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges,
// and we can't change that.
tx.Add(&knftables.Element{
Map: kubeServiceNodePortsMap,
Key: []string{
protocol,
strconv.Itoa(svcInfo.NodePort()),
},
Value: []string{
fmt.Sprintf("goto %s", externalTrafficChain),
},
})
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.)
tx.Add(&knftables.Element{
Map: kubeNoEndpointNodePortsMap,
Key: []string{
protocol,
strconv.Itoa(svcInfo.NodePort()),
},
Value: []string{
externalTrafficFilterVerdict,
},
Comment: &svcPortNameString,
})
}
}
// Set up internal traffic handling.
if hasInternalEndpoints {
if proxier.masqueradeAll {
tx.Add(&knftables.Rule{
Chain: internalTrafficChain,
Rule: knftables.Concat(
ipX, "daddr", svcInfo.ClusterIP(),
protocol, "dport", svcInfo.Port(),
"jump", kubeMarkMasqChain,
),
})
} else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The
// idea is that you can establish a static route for your
// Service range, routing to any node, and that node will
// bridge into the Service for you. Since that might bounce
// off-node, we masquerade here.
tx.Add(&knftables.Rule{
Chain: internalTrafficChain,
Rule: knftables.Concat(
ipX, "daddr", svcInfo.ClusterIP(),
protocol, "dport", svcInfo.Port(),
proxier.localDetector.IfNotLocalNFT(),
"jump", kubeMarkMasqChain,
),
})
}
}
// Set up external traffic handling (if any "external" destinations are
// enabled). All captured traffic for all external destinations should
// jump to externalTrafficChain, which will handle some special cases and
// then jump to externalPolicyChain.
if usesExternalTrafficChain {
if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade,
// in case we cross nodes.
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"jump", kubeMarkMasqChain,
),
})
} else {
// If we are only using same-node endpoints, we can retain the
// source IP in most cases.
if proxier.localDetector.IsImplemented() {
// Treat all locally-originated pod -> external destination
// traffic as a special-case. It is subject to neither
// form of traffic policy, which simulates going up-and-out
// to an external load-balancer and coming back in.
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
proxier.localDetector.IfLocalNFT(),
"goto", clusterPolicyChain,
),
Comment: ptr.To("short-circuit pod traffic"),
})
}
// Locally originated traffic (not a pod, but the host node)
// still needs masquerade because the LBIP itself is a local
// address, so that will be the chosen source IP.
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"fib", "saddr", "type", "local",
"jump", kubeMarkMasqChain,
),
Comment: ptr.To("masquerade local traffic"),
})
// Redirect all src-type=LOCAL -> external destination to the
// policy=cluster chain. This allows traffic originating
// from the host to be redirected to the service correctly.
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"fib", "saddr", "type", "local",
"goto", clusterPolicyChain,
),
Comment: ptr.To("short-circuit local traffic"),
})
}
// Anything else falls thru to the appropriate policy chain.
if hasExternalEndpoints {
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"goto", externalPolicyChain,
),
})
}
}
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
// Generate the per-endpoint affinity sets
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
// Create a set to store current affinity mappings. As
// with the iptables backend, endpoint affinity is
// recorded for connections from a particular source IP
// (without regard to source port) to a particular
// ServicePort (without regard to which service IP was
// used to reach the service). This may be changed in the
// future.
tx.Add(&knftables.Set{
Name: epInfo.affinitySetName,
Type: ipvX_addr,
Flags: []knftables.SetFlag{
// The nft docs say "dynamic" is only
// needed for sets containing stateful
// objects (eg counters), but (at least on
// RHEL8) if we create the set without
// "dynamic", it later gets mutated to
// have it, and then the next attempt to
// tx.Add() it here fails because it looks
// like we're trying to change the flags.
knftables.DynamicFlag,
knftables.TimeoutFlag,
},
Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
})
activeAffinitySets.Insert(epInfo.affinitySetName)
}
}
// If Cluster policy is in use, create the chain and create rules jumping
// from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain {
proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
}
// If Local policy is in use, create rules jumping from localPolicyChain
// to the localEndpoints
if usesLocalPolicyChain {
proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
}
// Generate the per-endpoint chains
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}
endpointChain := epInfo.chainName
// Handle traffic that loops back to the originator with SNAT.
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
ipX, "saddr", epInfo.IP(),
"jump", kubeMarkMasqChain,
),
})
// Handle session affinity
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
"update", "@", epInfo.affinitySetName,
"{", ipX, "saddr", "}",
),
})
}
// DNAT to final destination.
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
"meta l4proto", protocol,
"dnat to", epInfo.String(),
),
})
}
}
// Figure out which chains are now stale. Unfortunately, we can't delete them
// right away, because with kernels before 6.2, if there is a map element pointing
// to a chain, and you delete that map element, the kernel doesn't notice until a
// short amount of time later that the chain is now unreferenced. So we flush them
// now, and record the time that they become stale in staleChains so they can be
// deleted later.
existingChains, err := proxier.nftables.List(context.TODO(), "chains")
if err == nil {
for _, chain := range existingChains {
if isServiceChainName(chain) && !activeChains.Has(chain) {
tx.Flush(&knftables.Chain{
Name: chain,
})
proxier.staleChains[chain] = start
}
}
} else if !knftables.IsNotFound(err) {
klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
}
// OTOH, we can immediately delete any stale affinity sets
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
if err == nil {
for _, set := range existingSets {
if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
tx.Delete(&knftables.Set{
Name: set,
})
}
}
} else if !knftables.IsNotFound(err) {
klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
}
// Sync rules.
klog.V(2).InfoS("Reloading service nftables data",
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
)
// FIXME
// klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes())
err = proxier.nftables.Run(context.TODO(), tx)
if err != nil {
klog.ErrorS(err, "nftables sync failed")
metrics.IptablesRestoreFailuresTotal.Inc()
return
}
success = true
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
if proxier.healthzServer != nil {
proxier.healthzServer.Updated(proxier.ipFamily)
}
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
// Update service healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}