extension/k8smetadata/extension.go (136 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package k8smetadata
import (
"context"
"math/rand"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.uber.org/atomic"
"go.uber.org/zap"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
)
const (
deletionDelay = 2 * time.Minute
jitterKubernetesAPISeconds = 10
)
type KubernetesMetadata struct {
logger *zap.Logger
config *Config
ready atomic.Bool
safeStopCh *k8sclient.SafeChannel
endpointSliceWatcher *k8sclient.EndpointSliceWatcher
serviceWatcher *k8sclient.ServiceWatcher
}
var _ extension.Extension = (*KubernetesMetadata)(nil)
func jitterSleep(seconds int) {
jitter := time.Duration(rand.Intn(seconds)) * time.Second // nolint:gosec
time.Sleep(jitter)
}
func (e *KubernetesMetadata) Start(_ context.Context, _ component.Host) error {
e.logger.Debug("Starting k8smetadata extension...")
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
e.logger.Error("Failed to create config", zap.Error(err))
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
e.logger.Error("Failed to create kubernetes client", zap.Error(err))
}
// jitter calls to the kubernetes api (a precaution to prevent overloading api server)
jitterSleep(jitterKubernetesAPISeconds)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
e.safeStopCh = &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}
for _, obj := range e.config.Objects {
switch obj {
case "endpointslices":
// For the endpoint slice watcher, we maintain two mappings:
// 1. ip -> workload
// 2. service -> workload
//
// Scenario:
// When a deployment associated with service X has only one pod, the following events occur:
// a. A pod terminates (one endpoint terminating). For this event, we keep the service -> workload mapping (which is added before)
// b. The endpoints become empty (null endpoints). For this event, we remove the service -> workload mapping in a delay way
// c. A new pod starts (one endpoint starting). For this event, we add the same service -> workload mapping immediately
//
// Problem:
// In step (b), a deletion delay (e.g., 2 minutes) is initiated for the mapping with service key X.
// Then, in step (c), the mapping for service key X is re-added. Since the new mapping is inserted
// before the delay expires, the scheduled deletion from step (b) may erroneously remove the mapping
// added in step (c).
//
// Root Cause and Resolution:
// The issue is caused by deleting the mapping using only the key, without verifying the value.
// To fix this, we need to compare both the key and the value before deletion.
// That is exactly the purpose of TimedDeleterWithIDCheck.
timedDeleter := &k8sclient.TimedDeleterWithIDCheck{Delay: deletionDelay}
e.endpointSliceWatcher = k8sclient.NewEndpointSliceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.endpointSliceWatcher.Run(e.safeStopCh.Ch)
e.endpointSliceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
e.logger.Debug("EndpointSlice cache synced")
case "services":
// for service watcher, we are doing the mapping from IP to service name, it's very rare for an ip to be reused
// by two services. So we don't face the issue of service -> workload mapping in endpointSliceWatcher.
// Technically, we can use TimedDeleterWithIDCheck as well but it will involve changing podwatcher with a lot of code changes.
// I don't think it's worthwhile to do it now. We might conside to do it when podwatcher is no longer in use.
timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
e.serviceWatcher = k8sclient.NewServiceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.serviceWatcher.Run(e.safeStopCh.Ch)
e.serviceWatcher.WaitForCacheSync(e.safeStopCh.Ch)
e.logger.Debug("Service cache synced")
}
}
e.logger.Debug("Cache synced, extension fully started")
e.ready.Store(true)
return nil
}
func (e *KubernetesMetadata) Shutdown(_ context.Context) error {
if e.safeStopCh != nil {
e.safeStopCh.Close()
}
return nil
}
func (e *KubernetesMetadata) GetPodMetadataFromPodIP(ip string) k8sclient.PodMetadata {
if e.endpointSliceWatcher == nil {
e.logger.Debug("GetPodMetadataFromPodIP: endpointslices not enabled in config")
return k8sclient.PodMetadata{}
}
if ip == "" {
e.logger.Debug("GetPodMetadataFromPodIP: no IP provided")
return k8sclient.PodMetadata{}
}
pm, ok := e.endpointSliceWatcher.GetIPToPodMetadata().Load(ip)
if !ok {
e.logger.Debug("GetPodMetadataFromPodIP: no mapping found for IP", zap.String("ip", ip))
return k8sclient.PodMetadata{}
}
metadata := pm.(k8sclient.PodMetadata)
e.logger.Debug("GetPodMetadataFromPodIP: found metadata",
zap.String("ip", ip),
zap.String("workload", metadata.Workload),
zap.String("namespace", metadata.Namespace),
zap.String("node", metadata.Node),
)
return metadata
}
func (e *KubernetesMetadata) GetPodMetadataFromServiceAndNamespace(svcAndNS string) k8sclient.PodMetadata {
if e.endpointSliceWatcher == nil {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: endpointslices not enabled in config")
return k8sclient.PodMetadata{}
}
if svcAndNS == "" {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: no service@namespace provided")
return k8sclient.PodMetadata{}
}
pm, ok := e.endpointSliceWatcher.GetServiceNamespaceToPodMetadata().Load(svcAndNS)
if !ok {
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: no mapping found", zap.String("svcAndNS", svcAndNS))
return k8sclient.PodMetadata{}
}
metadata := pm.(k8sclient.PodMetadata)
e.logger.Debug("GetPodMetadataFromServiceAndNamespace: found metadata",
zap.String("serviceNameAndNamespace", svcAndNS),
zap.String("workload", metadata.Workload),
zap.String("node", metadata.Node),
)
return metadata
}
func (e *KubernetesMetadata) GetServiceAndNamespaceFromClusterIP(ip string) string {
if e.serviceWatcher == nil {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: services not enabled in config")
return ""
}
if ip == "" {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: no IP provided")
return ""
}
svcAndNS, ok := e.serviceWatcher.GetIPToServiceAndNamespace().Load(ip)
if !ok {
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: no mapping found", zap.String("ip", ip))
return ""
}
svcAndNSString := svcAndNS.(string)
e.logger.Debug("GetServiceAndNamespaceFromClusterIP: found metadata",
zap.String("ip", ip),
zap.String("svcAndNS", svcAndNSString),
)
return svcAndNSString
}