func getKubernetesResolver()

in plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go [90:236]


func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) subResolver {
	once.Do(func() {
		// Check environment for "list pods" approach
		useListPod := (os.Getenv(appSignalsUseListPod) == "true")

		if useListPod {
			logger.Info("APP_SIGNALS_USE_LIST_POD=true; setting up Pod & Service watchers, ignoring extension")

			cfg, err := clientcmd.BuildConfigFromFlags("", "")
			if err != nil {
				logger.Fatal("Failed to create config", zap.Error(err))
			}

			clientset, err := kubernetes.NewForConfig(cfg)
			if err != nil {
				logger.Fatal("Failed to create kubernetes client", zap.Error(err))
			}

			jitterSleep(jitterKubernetesAPISeconds)

			sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
			timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}

			poWatcher := newPodWatcher(logger, sharedInformerFactory, timedDeleter)
			svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter)

			safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
			// initialize the pod and service watchers for the cluster
			poWatcher.run(safeStopCh.Ch)
			svcWatcher.Run(safeStopCh.Ch)
			// wait for caches to sync (for once) so that clients knows about the pods and services in the cluster
			poWatcher.waitForCacheSync(safeStopCh.Ch)
			svcWatcher.WaitForCacheSync(safeStopCh.Ch)

			serviceToWorkload := &sync.Map{}
			svcToWorkloadMapper := k8sclient.NewServiceToWorkloadMapper(svcWatcher.GetServiceAndNamespaceToSelectors(), poWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter)
			svcToWorkloadMapper.Start(safeStopCh.Ch)

			instance = &kubernetesResolver{
				logger:                         logger,
				clientset:                      clientset,
				clusterName:                    clusterName,
				platformCode:                   platformCode,
				useExtension:                   false,
				ipToServiceAndNamespace:        svcWatcher.GetIPToServiceAndNamespace(),
				serviceAndNamespaceToSelectors: svcWatcher.GetServiceAndNamespaceToSelectors(),
				ipToPod:                        poWatcher.ipToPod,
				podToWorkloadAndNamespace:      poWatcher.podToWorkloadAndNamespace,
				workloadAndNamespaceToLabels:   poWatcher.workloadAndNamespaceToLabels,
				serviceToWorkload:              serviceToWorkload,
				workloadPodCount:               poWatcher.workloadPodCount,
				ipToWorkloadAndNamespace:       nil,
				safeStopCh:                     safeStopCh,
				useListPod:                     true,
			}
			return
		}

		// 2) If not using listPod, check if extension is present
		ext := k8smetadata.GetKubernetesMetadata()
		if ext != nil {
			// We skip all watchers (the extension has them).
			logger.Info("k8smetadata extension is present")

			instance = &kubernetesResolver{
				logger:       logger,
				clusterName:  clusterName,
				platformCode: platformCode,
				useExtension: true,
			}
			return
		}

		// 3) Extension is not present, and useListPod is false -> EndpointSlice approach
		logger.Info("k8smetadata extension not found; setting up EndpointSlice watchers")

		cfg, err := clientcmd.BuildConfigFromFlags("", "")
		if err != nil {
			logger.Fatal("Failed to create config", zap.Error(err))
		}

		clientset, err := kubernetes.NewForConfig(cfg)
		if err != nil {
			logger.Fatal("Failed to create kubernetes client", zap.Error(err))
		}

		jitterSleep(jitterKubernetesAPISeconds)

		sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)

		// For the endpoint slice watcher, we maintain two mappings:
		//   1. ip -> workload
		//   2. service -> workload
		//
		// Scenario:
		//   When a deployment associated with service X has only one pod, the following events occur:
		//     a. A pod terminates (one endpoint terminating). For this event, we keep the service -> workload mapping (which is added before)
		//     b. The endpoints become empty (null endpoints). For this event, we remove the service -> workload mapping in a delay way
		//     c. A new pod starts (one endpoint starting). For this event, we add the same service -> workload mapping immediately
		//
		// Problem:
		//   In step (b), a deletion delay (e.g., 2 minutes) is initiated for the mapping with service key X.
		//   Then, in step (c), the mapping for service key X is re-added. Since the new mapping is inserted
		//   before the delay expires, the scheduled deletion from step (b) may erroneously remove the mapping
		//   added in step (c).
		//
		// Root Cause and Resolution:
		//   The issue is caused by deleting the mapping using only the key, without verifying the value.
		//   To fix this, we need to compare both the key and the value before deletion.
		//   That is exactly the purpose of TimedDeleterWithIDCheck.
		TimedDeleterWithIDCheck := &k8sclient.TimedDeleterWithIDCheck{Delay: deletionDelay}
		endptSliceWatcher := k8sclient.NewEndpointSliceWatcher(logger, sharedInformerFactory, TimedDeleterWithIDCheck)

		// for service watcher, we are doing the mapping from IP to service name, it's very rare for an ip to be reused
		// by two services. So we don't face the issue of service -> workload mapping in endpointSliceWatcher.
		// Technically, we can use TimedDeleterWithIDCheck as well but it will involve changing podwatcher with a log of code changes.
		// I don't think it's worthwhile to do it now. We might conside to do it when podwatcher is no longer in use.
		timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
		svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter)

		safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
		// initialize the pod and service watchers for the cluster
		svcWatcher.Run(safeStopCh.Ch)
		endptSliceWatcher.Run(safeStopCh.Ch)
		// wait for caches to sync (for once) so that clients knows about the pods and services in the cluster
		svcWatcher.WaitForCacheSync(safeStopCh.Ch)
		endptSliceWatcher.WaitForCacheSync(safeStopCh.Ch)

		instance = &kubernetesResolver{
			logger:                       logger,
			clientset:                    clientset,
			clusterName:                  clusterName,
			platformCode:                 platformCode,
			ipToWorkloadAndNamespace:     endptSliceWatcher.GetIPToPodMetadata(), // endpointSlice provides pod IP → PodMetadata mapping
			ipToPod:                      nil,
			podToWorkloadAndNamespace:    nil,
			workloadAndNamespaceToLabels: nil,
			workloadPodCount:             nil,
			ipToServiceAndNamespace:      svcWatcher.GetIPToServiceAndNamespace(),
			serviceToWorkload:            endptSliceWatcher.GetServiceNamespaceToPodMetadata(), // endpointSlice also provides service → PodMetadata mapping
			safeStopCh:                   safeStopCh,
			useListPod:                   useListPod,
		}
	})

	return instance
}