plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go (175 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package resolver import ( "sync" mapset "github.com/deckarep/golang-set/v2" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient" ) func (p *podWatcher) removeHostNetworkRecords(pod *corev1.Pod) { for _, port := range k8sclient.GetHostNetworkPorts(pod) { p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.HostIP+":"+port) } } func (p *podWatcher) handlePodAdd(pod *corev1.Pod) { if pod.Spec.HostNetwork && pod.Status.HostIP != "" { for _, port := range k8sclient.GetHostNetworkPorts(pod) { p.ipToPod.Store(pod.Status.HostIP+":"+port, pod.Name) } } if pod.Status.PodIP != "" { p.ipToPod.Store(pod.Status.PodIP, pod.Name) } } func (p *podWatcher) handlePodUpdate(newPod *corev1.Pod, oldPod *corev1.Pod) { // HostNetwork is an immutable field if newPod.Spec.HostNetwork && oldPod.Status.HostIP != newPod.Status.HostIP { if oldPod.Status.HostIP != "" { p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", oldPod.Status.HostIP)) p.removeHostNetworkRecords(oldPod) } if newPod.Status.HostIP != "" { for _, port := range k8sclient.GetHostNetworkPorts(newPod) { p.ipToPod.Store(newPod.Status.HostIP+":"+port, newPod.Name) } } } if oldPod.Status.PodIP != newPod.Status.PodIP { if oldPod.Status.PodIP != "" { p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", oldPod.Status.PodIP)) p.deleter.DeleteWithDelay(p.ipToPod, oldPod.Status.PodIP) } if newPod.Status.PodIP != "" { p.ipToPod.Store(newPod.Status.PodIP, newPod.Name) } } } func (p *podWatcher) onAddOrUpdatePod(pod, oldPod *corev1.Pod) { if oldPod == nil { p.handlePodAdd(pod) } else { p.handlePodUpdate(pod, oldPod) } workloadAndNamespace := k8sclient.GetWorkloadAndNamespace(pod) if workloadAndNamespace != "" { p.podToWorkloadAndNamespace.Store(pod.Name, workloadAndNamespace) podLabels := mapset.NewSet[string]() for key, value := range pod.ObjectMeta.Labels { podLabels.Add(key + "=" + value) } if podLabels.Cardinality() > 0 { p.workloadAndNamespaceToLabels.Store(workloadAndNamespace, podLabels) } if oldPod == nil { p.workloadPodCount[workloadAndNamespace]++ p.logger.Debug("Added pod", zap.String("pod", pod.Name), zap.String("workload", workloadAndNamespace), zap.Int("count", p.workloadPodCount[workloadAndNamespace])) } } } func (p *podWatcher) onDeletePod(obj interface{}) { pod := obj.(*corev1.Pod) if pod.Spec.HostNetwork && pod.Status.HostIP != "" { p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", pod.Status.HostIP)) p.removeHostNetworkRecords(pod) } if pod.Status.PodIP != "" { p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", pod.Status.PodIP)) p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.PodIP) } if workloadKey, ok := p.podToWorkloadAndNamespace.Load(pod.Name); ok { workloadAndNamespace := workloadKey.(string) p.workloadPodCount[workloadAndNamespace]-- p.logger.Debug("decrementing pod count", zap.String("workload", workloadAndNamespace), zap.Int("podCount", p.workloadPodCount[workloadAndNamespace])) if p.workloadPodCount[workloadAndNamespace] == 0 { p.deleter.DeleteWithDelay(p.workloadAndNamespaceToLabels, workloadAndNamespace) } } else { p.logger.Error("failed to load pod workloadKey", zap.String("pod", pod.Name)) } p.deleter.DeleteWithDelay(p.podToWorkloadAndNamespace, pod.Name) } type podWatcher struct { ipToPod *sync.Map podToWorkloadAndNamespace *sync.Map workloadAndNamespaceToLabels *sync.Map workloadPodCount map[string]int logger *zap.Logger informer cache.SharedIndexInformer deleter k8sclient.Deleter } func newPodWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter k8sclient.Deleter) *podWatcher { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() err := podInformer.SetTransform(minimizePod) if err != nil { logger.Error("failed to minimize Pod objects", zap.Error(err)) } return &podWatcher{ ipToPod: &sync.Map{}, podToWorkloadAndNamespace: &sync.Map{}, workloadAndNamespaceToLabels: &sync.Map{}, workloadPodCount: make(map[string]int), logger: logger, informer: podInformer, deleter: deleter, } } func (p *podWatcher) run(stopCh chan struct{}) { p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) p.logger.Debug("list and watch for pod: ADD " + pod.Name) p.onAddOrUpdatePod(pod, nil) }, UpdateFunc: func(oldObj, newObj interface{}) { pod := newObj.(*corev1.Pod) oldPod := oldObj.(*corev1.Pod) p.logger.Debug("list and watch for pods: UPDATE " + pod.Name) p.onAddOrUpdatePod(pod, oldPod) }, DeleteFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) p.logger.Debug("list and watch for pods: DELETE " + pod.Name) p.onDeletePod(obj) }, }) go p.informer.Run(stopCh) } func (p *podWatcher) waitForCacheSync(stopCh chan struct{}) { if !cache.WaitForNamedCacheSync("podWatcher", stopCh, p.informer.HasSynced) { p.logger.Fatal("timed out waiting for kubernetes pod watcher caches to sync") } p.logger.Info("podWatcher: Cache synced") } // minimizePod removes fields that could contain large objects, and retain essential // fields needed for IP/name translation. The following fields must be kept: // - ObjectMeta: Namespace, Name, Labels, OwnerReference // - Spec: HostNetwork, ContainerPorts // - Status: PodIP/s, HostIP/s func minimizePod(obj interface{}) (interface{}, error) { if pod, ok := obj.(*corev1.Pod); ok { pod.Annotations = nil pod.Finalizers = nil pod.ManagedFields = nil pod.Spec.Volumes = nil pod.Spec.InitContainers = nil pod.Spec.EphemeralContainers = nil pod.Spec.ImagePullSecrets = nil pod.Spec.HostAliases = nil pod.Spec.SchedulingGates = nil pod.Spec.ResourceClaims = nil pod.Spec.Tolerations = nil pod.Spec.Affinity = nil pod.Status.InitContainerStatuses = nil pod.Status.ContainerStatuses = nil pod.Status.EphemeralContainerStatuses = nil for i := 0; i < len(pod.Spec.Containers); i++ { c := &pod.Spec.Containers[i] c.Image = "" c.Command = nil c.Args = nil c.EnvFrom = nil c.Env = nil c.Resources = corev1.ResourceRequirements{} c.VolumeMounts = nil c.VolumeDevices = nil c.SecurityContext = nil } } return obj, nil }