plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go (275 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package resolver import ( "context" "errors" "fmt" "math/rand" "os" "sync" "time" "go.opentelemetry.io/collector/pdata/pcommon" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" "go.uber.org/zap" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "github.com/aws/amazon-cloudwatch-agent/extension/k8smetadata" "github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" attr "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/attributes" ) const ( // Deletion delay adjustment: // Previously, EKS resolver would instantly remove the IP to Service mapping when a pod was destroyed. // This posed a problem because: // 1. Metric data is aggregated and emitted every 1 minute. // 2. If this aggregated metric data, which contains the IP of the now-destroyed pod, arrives // at the EKS resolver after the IP records have already been deleted, the metric can't be processed correctly. // // To mitigate this issue, we've introduced a 2-minute deletion delay. This ensures that any // metric data that arrives within those 2 minutes, containing the old IP, will still get mapped correctly to a service. deletionDelay = 2 * time.Minute jitterKubernetesAPISeconds = 10 // this is an environmental variable that might deprecate in future // when it's "true", we will use list pods API to get ip to workload mapping // otherwise, we will use list endpoint slices API instead appSignalsUseListPod = "APP_SIGNALS_USE_LIST_POD" ) type kubernetesResolver struct { logger *zap.Logger clientset kubernetes.Interface clusterName string platformCode string // If using the extension, no mappings wil be needed useExtension bool // if ListPod api is used, the following maps are needed ipToPod *sync.Map podToWorkloadAndNamespace *sync.Map workloadAndNamespaceToLabels *sync.Map workloadPodCount map[string]int // if ListEndpointSlice api is used, the following maps are needed ipToWorkloadAndNamespace *sync.Map // if ListService api is used, the following maps are needed ipToServiceAndNamespace *sync.Map serviceAndNamespaceToSelectors *sync.Map // if ListPod and ListService apis are used, the serviceToWorkload map is computed by ServiceToWorkloadMapper // from serviceAndNamespaceToSelectors and workloadAndNamespaceToLabels every 1 min // if ListEndpointSlice is used, we can get serviceToWorkload directly from endpointSlice watcher serviceToWorkload *sync.Map // safeStopCh *k8sclient.SafeChannel // trace and metric processors share the same kubernetesResolver and might close the same channel separately useListPod bool } var ( once sync.Once instance *kubernetesResolver ) func jitterSleep(seconds int) { jitter := time.Duration(rand.Intn(seconds)) * time.Second // nolint:gosec time.Sleep(jitter) } func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) subResolver { once.Do(func() { // Check environment for "list pods" approach useListPod := (os.Getenv(appSignalsUseListPod) == "true") if useListPod { logger.Info("APP_SIGNALS_USE_LIST_POD=true; setting up Pod & Service watchers, ignoring extension") cfg, err := clientcmd.BuildConfigFromFlags("", "") if err != nil { logger.Fatal("Failed to create config", zap.Error(err)) } clientset, err := kubernetes.NewForConfig(cfg) if err != nil { logger.Fatal("Failed to create kubernetes client", zap.Error(err)) } jitterSleep(jitterKubernetesAPISeconds) sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay} poWatcher := newPodWatcher(logger, sharedInformerFactory, timedDeleter) svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter) safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false} // initialize the pod and service watchers for the cluster poWatcher.run(safeStopCh.Ch) svcWatcher.Run(safeStopCh.Ch) // wait for caches to sync (for once) so that clients knows about the pods and services in the cluster poWatcher.waitForCacheSync(safeStopCh.Ch) svcWatcher.WaitForCacheSync(safeStopCh.Ch) serviceToWorkload := &sync.Map{} svcToWorkloadMapper := k8sclient.NewServiceToWorkloadMapper(svcWatcher.GetServiceAndNamespaceToSelectors(), poWatcher.workloadAndNamespaceToLabels, serviceToWorkload, logger, timedDeleter) svcToWorkloadMapper.Start(safeStopCh.Ch) instance = &kubernetesResolver{ logger: logger, clientset: clientset, clusterName: clusterName, platformCode: platformCode, useExtension: false, ipToServiceAndNamespace: svcWatcher.GetIPToServiceAndNamespace(), serviceAndNamespaceToSelectors: svcWatcher.GetServiceAndNamespaceToSelectors(), ipToPod: poWatcher.ipToPod, podToWorkloadAndNamespace: poWatcher.podToWorkloadAndNamespace, workloadAndNamespaceToLabels: poWatcher.workloadAndNamespaceToLabels, serviceToWorkload: serviceToWorkload, workloadPodCount: poWatcher.workloadPodCount, ipToWorkloadAndNamespace: nil, safeStopCh: safeStopCh, useListPod: true, } return } // 2) If not using listPod, check if extension is present ext := k8smetadata.GetKubernetesMetadata() if ext != nil { // We skip all watchers (the extension has them). logger.Info("k8smetadata extension is present") instance = &kubernetesResolver{ logger: logger, clusterName: clusterName, platformCode: platformCode, useExtension: true, } return } // 3) Extension is not present, and useListPod is false -> EndpointSlice approach logger.Info("k8smetadata extension not found; setting up EndpointSlice watchers") cfg, err := clientcmd.BuildConfigFromFlags("", "") if err != nil { logger.Fatal("Failed to create config", zap.Error(err)) } clientset, err := kubernetes.NewForConfig(cfg) if err != nil { logger.Fatal("Failed to create kubernetes client", zap.Error(err)) } jitterSleep(jitterKubernetesAPISeconds) sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) // For the endpoint slice watcher, we maintain two mappings: // 1. ip -> workload // 2. service -> workload // // Scenario: // When a deployment associated with service X has only one pod, the following events occur: // a. A pod terminates (one endpoint terminating). For this event, we keep the service -> workload mapping (which is added before) // b. The endpoints become empty (null endpoints). For this event, we remove the service -> workload mapping in a delay way // c. A new pod starts (one endpoint starting). For this event, we add the same service -> workload mapping immediately // // Problem: // In step (b), a deletion delay (e.g., 2 minutes) is initiated for the mapping with service key X. // Then, in step (c), the mapping for service key X is re-added. Since the new mapping is inserted // before the delay expires, the scheduled deletion from step (b) may erroneously remove the mapping // added in step (c). // // Root Cause and Resolution: // The issue is caused by deleting the mapping using only the key, without verifying the value. // To fix this, we need to compare both the key and the value before deletion. // That is exactly the purpose of TimedDeleterWithIDCheck. TimedDeleterWithIDCheck := &k8sclient.TimedDeleterWithIDCheck{Delay: deletionDelay} endptSliceWatcher := k8sclient.NewEndpointSliceWatcher(logger, sharedInformerFactory, TimedDeleterWithIDCheck) // for service watcher, we are doing the mapping from IP to service name, it's very rare for an ip to be reused // by two services. So we don't face the issue of service -> workload mapping in endpointSliceWatcher. // Technically, we can use TimedDeleterWithIDCheck as well but it will involve changing podwatcher with a log of code changes. // I don't think it's worthwhile to do it now. We might conside to do it when podwatcher is no longer in use. timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay} svcWatcher := k8sclient.NewServiceWatcher(logger, sharedInformerFactory, timedDeleter) safeStopCh := &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false} // initialize the pod and service watchers for the cluster svcWatcher.Run(safeStopCh.Ch) endptSliceWatcher.Run(safeStopCh.Ch) // wait for caches to sync (for once) so that clients knows about the pods and services in the cluster svcWatcher.WaitForCacheSync(safeStopCh.Ch) endptSliceWatcher.WaitForCacheSync(safeStopCh.Ch) instance = &kubernetesResolver{ logger: logger, clientset: clientset, clusterName: clusterName, platformCode: platformCode, ipToWorkloadAndNamespace: endptSliceWatcher.GetIPToPodMetadata(), // endpointSlice provides pod IP → PodMetadata mapping ipToPod: nil, podToWorkloadAndNamespace: nil, workloadAndNamespaceToLabels: nil, workloadPodCount: nil, ipToServiceAndNamespace: svcWatcher.GetIPToServiceAndNamespace(), serviceToWorkload: endptSliceWatcher.GetServiceNamespaceToPodMetadata(), // endpointSlice also provides service → PodMetadata mapping safeStopCh: safeStopCh, useListPod: useListPod, } }) return instance } func (e *kubernetesResolver) Stop(_ context.Context) error { e.safeStopCh.Close() return nil } // add a method to kubernetesResolver func (e *kubernetesResolver) getWorkloadAndNamespaceByIP(ip string) (string, string, error) { // If extension is available, rely on that if e.useExtension { ext := k8smetadata.GetKubernetesMetadata() if ext == nil { return "", "", errors.New("extension not found (unexpected)") } pm := ext.GetPodMetadataFromPodIP(ip) if pm.Workload != "" { return pm.Workload, pm.Namespace, nil } if svcKeyVal := ext.GetServiceAndNamespaceFromClusterIP(ip); svcKeyVal != "" { sm := ext.GetPodMetadataFromServiceAndNamespace(svcKeyVal) if sm.Workload != "" { return sm.Workload, sm.Namespace, nil } } return "", "", fmt.Errorf("extension could not resolve IP: %s", ip) } // Otherwise watchers if e.useListPod { // use results from pod watcher if podKey, ok := e.ipToPod.Load(ip); ok { pod := podKey.(string) if workloadKey, ok := e.podToWorkloadAndNamespace.Load(pod); ok { workload, namespace := k8sclient.ExtractResourceAndNamespace(workloadKey.(string)) return workload, namespace, nil } } } else { // use results from endpoint slice watcher if pmVal, ok := e.ipToWorkloadAndNamespace.Load(ip); ok { pm := pmVal.(k8sclient.PodMetadata) return pm.Workload, pm.Namespace, nil } } // Not found in IP->workload, so check IP->service, then service->workload if svcKeyVal, ok := e.ipToServiceAndNamespace.Load(ip); ok { svcAndNS := svcKeyVal.(string) if e.serviceToWorkload != nil { if pmVal, ok := e.serviceToWorkload.Load(svcAndNS); ok { // For EndpointSlice watchers, the value is k8sclient.PodMetadata // For listPod approach, the value might be "workload@namespace" switch val := pmVal.(type) { case k8sclient.PodMetadata: return val.Workload, val.Namespace, nil case string: workload, namespace := k8sclient.ExtractResourceAndNamespace(val) return workload, namespace, nil default: e.logger.Debug("Unknown type in serviceToWorkload map") } } } } return "", "", errors.New("no kubernetes workload found for ip: " + ip) } func (e *kubernetesResolver) Process(attributes, resourceAttributes pcommon.Map) error { var namespace string if value, ok := attributes.Get(attr.AWSRemoteService); ok { valueStr := value.AsString() ipStr := "" if ip, _, ok := k8sclient.ExtractIPPort(valueStr); ok { if workload, ns, err := e.getWorkloadAndNamespaceByIP(valueStr); err == nil { attributes.PutStr(attr.AWSRemoteService, workload) namespace = ns } else { ipStr = ip } } else if k8sclient.IsIP(valueStr) { ipStr = valueStr } if ipStr != "" { if workload, ns, err := e.getWorkloadAndNamespaceByIP(ipStr); err == nil { attributes.PutStr(attr.AWSRemoteService, workload) namespace = ns } else { e.logger.Debug("failed to Process ip", zap.String("ip", ipStr), zap.Error(err)) } } } if _, ok := attributes.Get(attr.AWSRemoteEnvironment); !ok { if namespace != "" { attributes.PutStr(attr.AWSRemoteEnvironment, fmt.Sprintf("%s:%s/%s", e.platformCode, e.clusterName, namespace)) } } return nil } type kubernetesResourceAttributesResolver struct { platformCode string clusterName string attributeMap map[string]string } func newKubernetesResourceAttributesResolver(platformCode, clusterName string) *kubernetesResourceAttributesResolver { return &kubernetesResourceAttributesResolver{ platformCode: platformCode, clusterName: clusterName, attributeMap: DefaultInheritedAttributes, } } func (h *kubernetesResourceAttributesResolver) Process(attributes, resourceAttributes pcommon.Map) error { for attrKey, mappingKey := range h.attributeMap { if val, ok := resourceAttributes.Get(attrKey); ok { attributes.PutStr(mappingKey, val.AsString()) } } if h.platformCode == config.PlatformEKS { attributes.PutStr(common.AttributePlatformType, AttributePlatformEKS) attributes.PutStr(common.AttributeEKSClusterName, h.clusterName) } else { attributes.PutStr(common.AttributePlatformType, AttributePlatformK8S) attributes.PutStr(common.AttributeK8SClusterName, h.clusterName) } var namespace string if nsAttr, ok := resourceAttributes.Get(semconv.AttributeK8SNamespaceName); ok { namespace = nsAttr.Str() } else { namespace = "UnknownNamespace" } if val, ok := attributes.Get(attr.AWSLocalEnvironment); !ok { env := generateLocalEnvironment(h.platformCode, h.clusterName+"/"+namespace) attributes.PutStr(attr.AWSLocalEnvironment, env) } else { attributes.PutStr(attr.AWSLocalEnvironment, val.Str()) } attributes.PutStr(common.AttributeK8SNamespace, namespace) //The application log group in Container Insights is a fixed pattern: // "/aws/containerinsights/{Cluster_Name}/application" // See https://github.com/aws/amazon-cloudwatch-agent-operator/blob/fe144bb02d7b1930715aa3ea32e57a5ff13406aa/helm/templates/fluent-bit-configmap.yaml#L82 logGroupName := "/aws/containerinsights/" + h.clusterName + "/application" resourceAttributes.PutStr(semconv.AttributeAWSLogGroupNames, logGroupName) return nil } func (h *kubernetesResourceAttributesResolver) Stop(ctx context.Context) error { return nil }