in plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go [90:236]
func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) subResolver {
once.Do(func() {
// Check environment for "list pods" approach
useListPod := (os.Getenv(appSignalsUseListPod) == "true")
if useListPod {
logger.Info("APP_SIGNALS_USE_LIST_POD=true; setting up Pod & Service watchers, ignoring extension")
cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
logger.Fatal("Failed to create config", zap.Error(err))
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Fatal("Failed to create kubernetes client", zap.Error(err))
}
jitterSleep(jitterKubernetesAPISeconds)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
poWatcher := newPodWatcher(logger, sharedInformerFactory, timedDeleter)
svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter)
safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
// initialize the pod and service watchers for the cluster
poWatcher.run(safeStopCh.Ch)
svcWatcher.Run(safeStopCh.Ch)
// wait for caches to sync (for once) so that clients knows about the pods and services in the cluster
poWatcher.waitForCacheSync(safeStopCh.Ch)
svcWatcher.WaitForCacheSync(safeStopCh.Ch)
serviceToWorkload := &sync.Map{}
svcToWorkloadMapper := k8sclient.NewServiceToWorkloadMapper(svcWatcher.GetServiceAndNamespaceToSelectors(), poWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter)
svcToWorkloadMapper.Start(safeStopCh.Ch)
instance = &kubernetesResolver{
logger: logger,
clientset: clientset,
clusterName: clusterName,
platformCode: platformCode,
useExtension: false,
ipToServiceAndNamespace: svcWatcher.GetIPToServiceAndNamespace(),
serviceAndNamespaceToSelectors: svcWatcher.GetServiceAndNamespaceToSelectors(),
ipToPod: poWatcher.ipToPod,
podToWorkloadAndNamespace: poWatcher.podToWorkloadAndNamespace,
workloadAndNamespaceToLabels: poWatcher.workloadAndNamespaceToLabels,
serviceToWorkload: serviceToWorkload,
workloadPodCount: poWatcher.workloadPodCount,
ipToWorkloadAndNamespace: nil,
safeStopCh: safeStopCh,
useListPod: true,
}
return
}
// 2) If not using listPod, check if extension is present
ext := k8smetadata.GetKubernetesMetadata()
if ext != nil {
// We skip all watchers (the extension has them).
logger.Info("k8smetadata extension is present")
instance = &kubernetesResolver{
logger: logger,
clusterName: clusterName,
platformCode: platformCode,
useExtension: true,
}
return
}
// 3) Extension is not present, and useListPod is false -> EndpointSlice approach
logger.Info("k8smetadata extension not found; setting up EndpointSlice watchers")
cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
logger.Fatal("Failed to create config", zap.Error(err))
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Fatal("Failed to create kubernetes client", zap.Error(err))
}
jitterSleep(jitterKubernetesAPISeconds)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
// For the endpoint slice watcher, we maintain two mappings:
// 1. ip -> workload
// 2. service -> workload
//
// Scenario:
// When a deployment associated with service X has only one pod, the following events occur:
// a. A pod terminates (one endpoint terminating). For this event, we keep the service -> workload mapping (which is added before)
// b. The endpoints become empty (null endpoints). For this event, we remove the service -> workload mapping in a delay way
// c. A new pod starts (one endpoint starting). For this event, we add the same service -> workload mapping immediately
//
// Problem:
// In step (b), a deletion delay (e.g., 2 minutes) is initiated for the mapping with service key X.
// Then, in step (c), the mapping for service key X is re-added. Since the new mapping is inserted
// before the delay expires, the scheduled deletion from step (b) may erroneously remove the mapping
// added in step (c).
//
// Root Cause and Resolution:
// The issue is caused by deleting the mapping using only the key, without verifying the value.
// To fix this, we need to compare both the key and the value before deletion.
// That is exactly the purpose of TimedDeleterWithIDCheck.
TimedDeleterWithIDCheck := &k8sclient.TimedDeleterWithIDCheck{Delay: deletionDelay}
endptSliceWatcher := k8sclient.NewEndpointSliceWatcher(logger, sharedInformerFactory, TimedDeleterWithIDCheck)
// for service watcher, we are doing the mapping from IP to service name, it's very rare for an ip to be reused
// by two services. So we don't face the issue of service -> workload mapping in endpointSliceWatcher.
// Technically, we can use TimedDeleterWithIDCheck as well but it will involve changing podwatcher with a log of code changes.
// I don't think it's worthwhile to do it now. We might conside to do it when podwatcher is no longer in use.
timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter)
safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
// initialize the pod and service watchers for the cluster
svcWatcher.Run(safeStopCh.Ch)
endptSliceWatcher.Run(safeStopCh.Ch)
// wait for caches to sync (for once) so that clients knows about the pods and services in the cluster
svcWatcher.WaitForCacheSync(safeStopCh.Ch)
endptSliceWatcher.WaitForCacheSync(safeStopCh.Ch)
instance = &kubernetesResolver{
logger: logger,
clientset: clientset,
clusterName: clusterName,
platformCode: platformCode,
ipToWorkloadAndNamespace: endptSliceWatcher.GetIPToPodMetadata(), // endpointSlice provides pod IP → PodMetadata mapping
ipToPod: nil,
podToWorkloadAndNamespace: nil,
workloadAndNamespaceToLabels: nil,
workloadPodCount: nil,
ipToServiceAndNamespace: svcWatcher.GetIPToServiceAndNamespace(),
serviceToWorkload: endptSliceWatcher.GetServiceNamespaceToPodMetadata(), // endpointSlice also provides service → PodMetadata mapping
safeStopCh: safeStopCh,
useListPod: useListPod,
}
})
return instance
}