internal/k8sCommon/k8sclient/endpointslicewatcher.go (256 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package k8sclient
import (
"fmt"
"sync"
"github.com/google/uuid"
"go.uber.org/zap"
discv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
// EndpointSliceWatcher watches EndpointSlices and builds:
// 1. ip/ip:port -> {"workload", "namespace", "node"}
// 2. service@namespace -> {"workload", "namespace", "node"}
type EndpointSliceWatcher struct {
logger *zap.Logger
informer cache.SharedIndexInformer
ipToPodMetadata *sync.Map // key: "ip" or "ip:port", val: PodMetadata
serviceNamespaceToPodMetadata *sync.Map // key: "serviceName@namespace", val: PodMetadata
// For bookkeeping, so we can remove old mappings upon EndpointSlice deletion
sliceToKeysMap sync.Map // map[sliceUID string] -> []string of keys we inserted, which are "ip", "ip:port", or "service@namespace"
deleter Deleter
}
// PodMetadata holds {"workload", "namespace", "node"}
type PodMetadata struct {
Workload string
Namespace string
Node string
uuid string // unexported field to store the unique identifier
}
// NewPodMetadata constructs a new PodMetadata instance with a unique uuid.
func NewPodMetadata(workload, namespace, node string) PodMetadata {
return PodMetadata{
Workload: workload,
Namespace: namespace,
Node: node,
uuid: uuid.NewString(),
}
}
// UUID returns the unique identifier (uuid) for the PodMetadata.
func (pm PodMetadata) UUID() string {
return pm.uuid
}
// kvPair holds one mapping from key -> value. The isService flag
// indicates whether this key is for a Service or for an IP/IP:port.
type kvPair struct {
key string // key: "ip" or "ip:port" or "service@namespace"
value PodMetadata // value: {"workload", "namespace", "node"}
isService bool // true if key = "service@namespace"
}
// NewEndpointSliceWatcher creates an EndpointSlice watcher for the new approach (when USE_LIST_POD=false).
func NewEndpointSliceWatcher(
logger *zap.Logger,
factory informers.SharedInformerFactory,
deleter Deleter,
) *EndpointSliceWatcher {
esInformer := factory.Discovery().V1().EndpointSlices().Informer()
err := esInformer.SetTransform(minimizeEndpointSlice)
if err != nil {
logger.Error("failed to minimize EndpointSlice objects", zap.Error(err))
}
return &EndpointSliceWatcher{
logger: logger,
informer: esInformer,
ipToPodMetadata: &sync.Map{},
serviceNamespaceToPodMetadata: &sync.Map{},
deleter: deleter,
}
}
// Run starts the EndpointSliceWatcher.
func (w *EndpointSliceWatcher) Run(stopCh chan struct{}) {
w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
w.handleSliceAdd(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
w.handleSliceUpdate(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
w.handleSliceDelete(obj)
},
})
go w.informer.Run(stopCh)
}
// WaitForCacheSync blocks until the cache is synchronized, or the stopCh is closed.
func (w *EndpointSliceWatcher) WaitForCacheSync(stopCh chan struct{}) {
if !cache.WaitForNamedCacheSync("endpointSliceWatcher", stopCh, w.informer.HasSynced) {
w.logger.Error("timed out waiting for endpointSliceWatcher cache to sync")
}
w.logger.Info("endpointSliceWatcher: Cache synced")
}
// extractEndpointSliceKeyValuePairs computes the relevant mappings from an EndpointSlice.
//
// It returns a list of kvPair:
// - All IP and IP:port keys (isService=false) -> {"workload", "namespace", "node"}
// - The Service name key (isService=true) -> first {"workload", "namespace", "node"} found
//
// This function does NOT modify ipToPodMetadata or serviceNamespaceToPodMetadata. It's purely for computing
// the pairs, so it can be reused by both add and update methods.
func (w *EndpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.EndpointSlice) []kvPair {
var pairs []kvPair
isFirstPod := true
svcName := slice.Labels["kubernetes.io/service-name"]
for _, endpoint := range slice.Endpoints {
if endpoint.TargetRef != nil {
if endpoint.TargetRef.Kind != "Pod" {
continue
}
podName := endpoint.TargetRef.Name
ns := endpoint.TargetRef.Namespace
var nodeName string
if endpoint.NodeName != nil {
nodeName = *endpoint.NodeName
}
w.logger.Debug("Processing endpoint",
zap.String("podName", podName),
zap.String("namespace", ns),
zap.String("nodeName", nodeName),
)
derivedWorkload := InferWorkloadName(podName, svcName)
if derivedWorkload == "" {
w.logger.Warn("failed to infer workload name from Pod name")
continue
}
fullWl := NewPodMetadata(derivedWorkload, ns, nodeName)
// Build IP and IP:port pairs
for _, addr := range endpoint.Addresses {
// "ip" -> PodMetadata
pairs = append(pairs, kvPair{
key: addr,
value: fullWl,
isService: false,
})
// "ip:port" -> PodMetadata for each port
for _, portDef := range slice.Ports {
if portDef.Port != nil {
ipPort := fmt.Sprintf("%s:%d", addr, *portDef.Port)
pairs = append(pairs, kvPair{
key: ipPort,
value: fullWl,
isService: false,
})
}
}
}
// Build service name -> PodMetadata pair from the first pod
if isFirstPod {
isFirstPod = false
if svcName != "" && ns != "" {
pairs = append(pairs, kvPair{
key: svcName + "@" + ns,
value: fullWl,
isService: true,
})
}
}
}
}
return pairs
}
// handleSliceAdd handles a new EndpointSlice that wasn't seen before.
// It computes all keys and directly stores them. Then it records those keys
// in sliceToKeysMap so that we can remove them later upon deletion.
func (w *EndpointSliceWatcher) handleSliceAdd(obj interface{}) {
newSlice := obj.(*discv1.EndpointSlice)
w.logger.Debug("Received EndpointSlice Add",
zap.String("sliceName", newSlice.Name),
zap.String("uid", string(newSlice.UID)),
zap.String("namespace", newSlice.Namespace),
)
sliceUID := string(newSlice.UID)
// Compute all key-value pairs for this new slice
pairs := w.extractEndpointSliceKeyValuePairs(newSlice)
w.logger.Debug("Extracted pairs from new slice",
zap.Int("pairsCount", len(pairs)),
)
// Insert them into our ipToWorkload / serviceToWorkload, and track the keys.
keys := make([]string, 0, len(pairs))
for _, kv := range pairs {
if kv.isService {
w.serviceNamespaceToPodMetadata.Store(kv.key, kv.value)
} else {
w.ipToPodMetadata.Store(kv.key, kv.value)
}
keys = append(keys, kv.key)
}
// Save these keys so we can remove them on delete
w.sliceToKeysMap.Store(sliceUID, keys)
}
// handleSliceUpdate handles an update from oldSlice -> newSlice.
// Instead of blindly removing all old keys and adding new ones, it diffs them:
// - remove only keys that no longer exist,
// - add only new keys that didn't exist before,
// - keep those that haven't changed.
func (w *EndpointSliceWatcher) handleSliceUpdate(oldObj, newObj interface{}) {
oldSlice := oldObj.(*discv1.EndpointSlice)
newSlice := newObj.(*discv1.EndpointSlice)
w.logger.Debug("Received EndpointSlice Update",
zap.String("oldSliceUID", string(oldSlice.UID)),
zap.String("newSliceUID", string(newSlice.UID)),
zap.String("name", newSlice.Name),
zap.String("namespace", newSlice.Namespace),
)
oldUID := string(oldSlice.UID)
newUID := string(newSlice.UID)
// 1) Fetch old keys from sliceToKeysMap (if present).
var oldKeys []string
if val, ok := w.sliceToKeysMap.Load(oldUID); ok {
oldKeys = val.([]string)
}
// 2) Compute fresh pairs (and thus keys) from the new slice.
newPairs := w.extractEndpointSliceKeyValuePairs(newSlice)
var newKeys []string
for _, kv := range newPairs {
newKeys = append(newKeys, kv.key)
}
// Convert oldKeys/newKeys to sets for easy diff
oldKeysSet := make(map[string]struct{}, len(oldKeys))
for _, k := range oldKeys {
oldKeysSet[k] = struct{}{}
}
newKeysSet := make(map[string]struct{}, len(newKeys))
for _, k := range newKeys {
newKeysSet[k] = struct{}{}
}
// 3) For each key in oldKeys that doesn't exist in newKeys, remove it
for k := range oldKeysSet {
if _, stillPresent := newKeysSet[k]; !stillPresent {
w.deleter.DeleteWithDelay(w.ipToPodMetadata, k)
w.deleter.DeleteWithDelay(w.serviceNamespaceToPodMetadata, k)
}
}
// 4) For each key in newKeys that wasn't in oldKeys, we need to store it
// in the appropriate sync.Map. We'll look up the value from newPairs.
for _, kv := range newPairs {
if _, alreadyHad := oldKeysSet[kv.key]; !alreadyHad {
if kv.isService {
w.serviceNamespaceToPodMetadata.Store(kv.key, kv.value)
} else {
w.ipToPodMetadata.Store(kv.key, kv.value)
}
}
}
// 5) Update sliceToKeysMap for the new slice UID
// (Often the UID doesn't change across updates, but we'll handle it properly.)
w.sliceToKeysMap.Delete(oldUID)
w.sliceToKeysMap.Store(newUID, newKeys)
w.logger.Debug("Finished handling EndpointSlice Update",
zap.String("sliceUID", string(newSlice.UID)))
}
// handleSliceDelete removes any IP->PodMetadata or service->PodMetadata keys that were created by this slice.
func (w *EndpointSliceWatcher) handleSliceDelete(obj interface{}) {
slice := obj.(*discv1.EndpointSlice)
w.logger.Debug("Received EndpointSlice Delete",
zap.String("uid", string(slice.UID)),
zap.String("name", slice.Name),
zap.String("namespace", slice.Namespace),
)
w.removeSliceKeys(slice)
}
func (w *EndpointSliceWatcher) removeSliceKeys(slice *discv1.EndpointSlice) {
sliceUID := string(slice.UID)
val, ok := w.sliceToKeysMap.Load(sliceUID)
if !ok {
return
}
keys := val.([]string)
for _, k := range keys {
w.deleter.DeleteWithDelay(w.ipToPodMetadata, k)
w.deleter.DeleteWithDelay(w.serviceNamespaceToPodMetadata, k)
}
w.sliceToKeysMap.Delete(sliceUID)
}
// GetIPToPodMetadata returns the ipToPodMetadata
func (w *EndpointSliceWatcher) GetIPToPodMetadata() *sync.Map {
return w.ipToPodMetadata
}
// InitializeIPToPodMetadata initializes the ipToPodMetadata
func (w *EndpointSliceWatcher) InitializeIPToPodMetadata() {
w.ipToPodMetadata = &sync.Map{}
}
// GetServiceNamespaceToPodMetadata returns the serviceNamespaceToPodMetadata
func (w *EndpointSliceWatcher) GetServiceNamespaceToPodMetadata() *sync.Map {
return w.serviceNamespaceToPodMetadata
}
// InitializeServiceNamespaceToPodMetadata initializes the serviceNamespaceToPodMetadata
func (w *EndpointSliceWatcher) InitializeServiceNamespaceToPodMetadata() {
w.serviceNamespaceToPodMetadata = &sync.Map{}
}
// minimizeEndpointSlice removes fields that are not required by our mapping logic,
// retaining only the minimal set of fields needed (ObjectMeta.Name, Namespace, UID, Labels,
// Endpoints (with their Addresses and TargetRef) and Ports).
func minimizeEndpointSlice(obj interface{}) (interface{}, error) {
eps, ok := obj.(*discv1.EndpointSlice)
if !ok {
return obj, fmt.Errorf("object is not an EndpointSlice")
}
// Minimize metadata: we only really need Name, Namespace, UID and Labels.
eps.Annotations = nil
eps.ManagedFields = nil
eps.Finalizers = nil
// The watcher only uses:
// - eps.Labels["kubernetes.io/service-name"]
// - eps.Namespace (from metadata)
// - eps.UID (from metadata)
// - eps.Endpoints: for each endpoint, its Addresses and TargetRef.
// - eps.Ports: each port's Port (and optionally Name/Protocol)
//
// For each endpoint, clear fields that we don’t use.
for i := range eps.Endpoints {
// We only need Addresses and TargetRef. Hostname, and Zone are not used.
eps.Endpoints[i].Hostname = nil
eps.Endpoints[i].Zone = nil
eps.Endpoints[i].DeprecatedTopology = nil
eps.Endpoints[i].Hints = nil
}
// No transformation is needed for eps.Ports because we use them directly.
return eps, nil
}