event-exporter/kubernetes/podlabels/cache_collector.go (135 lines of code) (raw):
package podlabels
import (
"context"
"strings"
"time"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
var (
getPodErrLogRateLimiter = rate.NewLimiter(rate.Every(10*time.Second), 1)
)
type cacheKey struct {
Namespace string
Name string
}
// PodLabelCollector defines the interface for GetLabels for a namespace and pod name pair.
type PodLabelCollector interface {
GetLabels(namespace, pod string) map[string]string
}
// podLabelCollectorWithCache implements PodLabelCollector interface with an internal pod label cache.
type podLabelCollectorWithCache struct {
ignoredNamespaces map[string]struct{}
client kubernetes.Interface
getPodTimeout time.Duration
cache *lru.Cache[cacheKey, map[string]string]
emptyLabelPodCache *lru.Cache[cacheKey, time.Time] // cache the timestamp so we can check TTL.
emptyLabelPodCacheTTL time.Duration
}
// NewCollector returns a new podLabelCollectorWithCache that maintains a set of (podName, namespace) -> (podLabels) mappings.
func NewCollector(client kubernetes.Interface, ignoredNamespaces []string, podCacheSize, emptyLabelPodCacheSize int, emptyLabelPodCacheTTL, getPodTimeout time.Duration) (*podLabelCollectorWithCache, error) {
cache, err := lru.NewWithEvict[cacheKey, map[string]string](podCacheSize, recordEviction)
if err != nil {
return nil, err
}
emptyLabelPodCache, err := lru.NewWithEvict[cacheKey, time.Time](emptyLabelPodCacheSize, recordEmptyLabelPodCacheEvict)
if err != nil {
return nil, err
}
pc := &podLabelCollectorWithCache{
client: client,
getPodTimeout: getPodTimeout,
ignoredNamespaces: make(map[string]struct{}),
cache: cache,
emptyLabelPodCache: emptyLabelPodCache,
emptyLabelPodCacheTTL: emptyLabelPodCacheTTL,
}
for _, ns := range ignoredNamespaces {
pc.ignoredNamespaces[ns] = struct{}{}
}
return pc, nil
}
// GetLabels returns the labels for a given Pod. If there's no local cache, we will attempt a
// get from apiserver.
func (pc *podLabelCollectorWithCache) GetLabels(namespaceName, podName string) map[string]string {
if _, ok := pc.ignoredNamespaces[namespaceName]; ok {
return nil
}
podID := cacheKey{Namespace: namespaceName, Name: podName}
if labels, ok := pc.cache.Get(podID); ok {
recordQueryHit()
return labels
}
recordQueryMiss()
// For pod with empty labels, return empty label directly.
if timestamp, ok := pc.emptyLabelPodCache.Get(podID); ok {
if timestamp.Add(pc.emptyLabelPodCacheTTL).After(time.Now()) {
recordEmptyLabelPodCacheHit()
return nil
} else {
recordEmptyLabelPodCacheExpire()
pc.emptyLabelPodCache.Remove(podID)
}
} else {
recordEmptyLabelPodCacheMiss()
}
ctx, cancel := context.WithTimeout(context.TODO(), pc.getPodTimeout)
pod, err := pc.client.CoreV1().Pods(namespaceName).Get(ctx, podName, metav1.GetOptions{})
cancel()
if err != nil {
if getPodErrLogRateLimiter.Allow() {
glog.Errorf("Failed to get pod %s/%s %v", namespaceName, podName, err)
}
if statusError, isStatus := err.(*apierrors.StatusError); isStatus {
recordPodGet(string(statusError.ErrStatus.Reason))
if apierrors.IsNotFound(statusError) {
recordEmptyLabelPodCacheAddition()
pc.emptyLabelPodCache.Add(podID, time.Now())
}
} else {
recordPodGet("UnknownFailure")
}
return nil
}
recordPodGet("OK")
labels := getLabelsFromPod(pod)
if len(labels) > 0 {
pc.cache.Add(podID, labels)
recordAddition()
return labels
}
recordEmptyLabelPodCacheAddition()
pc.emptyLabelPodCache.Add(podID, time.Now())
return nil
}
// get owner labels for go/gke-log-owner-label for non-system logs.
// If there are multiple owners, the loop below picks the last valid one.
func getLabelsFromPod(pod *corev1.Pod) map[string]string {
transformedLabels := map[string]string{}
for _, owner := range pod.GetObjectMeta().GetOwnerReferences() {
switch owner.Kind {
case "DaemonSet":
transformedLabels[ownerTypeKeyName] = "DaemonSet"
transformedLabels[ownerNameKeyName] = owner.Name
case "StatefulSet":
transformedLabels[ownerTypeKeyName] = "StatefulSet"
transformedLabels[ownerNameKeyName] = owner.Name
case "ReplicaSet":
// Pod that is eventually owned by Deployment has pod name:
// <DeploymentName>-<PodTemplateHash>-<RandomString>
// and owner replicaset name: <DeploymentName>-<PodTemplateHash>
if templateHashSuffix := "-" + pod.GetObjectMeta().GetLabels()["pod-template-hash"]; len(templateHashSuffix) > 1 {
if ownerName, ok := strings.CutSuffix(owner.Name, templateHashSuffix); ok {
transformedLabels[ownerTypeKeyName] = "Deployment"
transformedLabels[ownerNameKeyName] = ownerName
}
}
case "Job":
if ownerName, ok := stripUnixTimeSuffix(owner.Name); ok {
// Pod that is eventually owned by a CronJob has pod name:
// <CronJobName>-<UnixTimeInMin>-<RandomString>
// and owner job name: <CronJobName>-<UnixTimeInMin>
transformedLabels[ownerTypeKeyName] = "CronJob"
transformedLabels[ownerNameKeyName] = ownerName
} else if jobsetName := pod.GetObjectMeta().GetLabels()[jobSetNameLabelKey]; jobsetName != "" {
// Pod that is eventually owned by a JobSet has the jobset name label set.
transformedLabels[ownerTypeKeyName] = "JobSet"
transformedLabels[ownerNameKeyName] = jobsetName
} else {
transformedLabels[ownerTypeKeyName] = "Job"
transformedLabels[ownerNameKeyName] = owner.Name
}
}
}
return transformedLabels
}