receiver/awscontainerinsightreceiver/internal/stores/podstore.go (541 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
import (
"context"
"errors"
"fmt"
"log"
"regexp"
"strings"
"sync"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)
const (
refreshInterval = 30 * time.Second
measurementsExpiry = 10 * time.Minute
podsExpiry = 2 * time.Minute
memoryKey = "memory"
cpuKey = "cpu"
splitRegexStr = "\\.|-"
kubeProxy = "kube-proxy"
)
var re = regexp.MustCompile(splitRegexStr)
type cachedEntry struct {
pod corev1.Pod
creation time.Time
}
type Owner struct {
OwnerKind string `json:"owner_kind"`
OwnerName string `json:"owner_name"`
}
type prevPodMeasurement struct {
containersRestarts int
}
type prevContainerMeasurement struct {
restarts int
}
type mapWithExpiry struct {
*awsmetrics.MapWithExpiry
}
func (m *mapWithExpiry) Get(key string) (any, bool) {
m.Lock()
defer m.Unlock()
if val, ok := m.MapWithExpiry.Get(awsmetrics.NewKey(key, nil)); ok {
return val.RawValue, ok
}
return nil, false
}
func (m *mapWithExpiry) Set(key string, content any) {
m.Lock()
defer m.Unlock()
val := awsmetrics.MetricValue{
RawValue: content,
Timestamp: time.Now(),
}
m.MapWithExpiry.Set(awsmetrics.NewKey(key, nil), val)
}
func newMapWithExpiry(ttl time.Duration) *mapWithExpiry {
return &mapWithExpiry{
MapWithExpiry: awsmetrics.NewMapWithExpiry(ttl),
}
}
type replicaSetInfoProvider interface {
GetReplicaSetClient() k8sclient.ReplicaSetClient
}
type podClient interface {
ListPods() ([]corev1.Pod, error)
}
type PodStore struct {
cache *mapWithExpiry
prevMeasurements map[string]*mapWithExpiry // preMeasurements per each Type (Pod, Container, etc)
podClient podClient
k8sClient replicaSetInfoProvider
lastRefreshed time.Time
nodeInfo *nodeInfo
prefFullPodName bool
logger *zap.Logger
sync.Mutex
addFullPodNameMetricLabel bool
}
func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, logger *zap.Logger) (*PodStore, error) {
podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger)
if err != nil {
return nil, err
}
// Try to detect kubelet permission issue here
if _, err := podClient.ListPods(); err != nil {
return nil, fmt.Errorf("cannot get pod from kubelet, err: %w", err)
}
k8sClient := k8sclient.Get(logger)
if k8sClient == nil {
return nil, errors.New("failed to start pod store because k8sclient is nil")
}
podStore := &PodStore{
cache: newMapWithExpiry(podsExpiry),
prevMeasurements: make(map[string]*mapWithExpiry),
podClient: podClient,
nodeInfo: newNodeInfo(logger),
prefFullPodName: prefFullPodName,
k8sClient: k8sClient,
logger: logger,
addFullPodNameMetricLabel: addFullPodNameMetricLabel,
}
return podStore, nil
}
func (p *PodStore) Shutdown() error {
var errs error
errs = p.cache.Shutdown()
for _, maps := range p.prevMeasurements {
if prevMeasErr := maps.Shutdown(); prevMeasErr != nil {
errs = errors.Join(errs, prevMeasErr)
}
}
return errs
}
func (p *PodStore) getPrevMeasurement(metricType, metricKey string) (any, bool) {
prevMeasurement, ok := p.prevMeasurements[metricType]
if !ok {
return nil, false
}
content, ok := prevMeasurement.Get(metricKey)
if !ok {
return nil, false
}
return content, true
}
func (p *PodStore) setPrevMeasurement(metricType, metricKey string, content any) {
prevMeasurement, ok := p.prevMeasurements[metricType]
if !ok {
prevMeasurement = newMapWithExpiry(measurementsExpiry)
p.prevMeasurements[metricType] = prevMeasurement
}
prevMeasurement.Set(metricKey, content)
}
// RefreshTick triggers refreshing of the pod store.
// It will be called at relatively short intervals (e.g. 1 second).
// We can't do refresh in regular interval because the Decorate(...) function will
// call refresh(...) on demand when the pod metadata for the given metrics is not in
// cache yet. This will make the refresh interval irregular.
func (p *PodStore) RefreshTick(ctx context.Context) {
now := time.Now()
if now.Sub(p.lastRefreshed) >= refreshInterval {
p.refresh(ctx, now)
p.lastRefreshed = now
}
}
func (p *PodStore) Decorate(ctx context.Context, metric CIMetric, kubernetesBlob map[string]any) bool {
if metric.GetTag(ci.MetricType) == ci.TypeNode {
p.decorateNode(metric)
} else if metric.GetTag(ci.K8sPodNameKey) != "" {
podKey := createPodKeyFromMetric(metric)
if podKey == "" {
p.logger.Error("podKey is unavailable when decorating pod")
return false
}
entry := p.getCachedEntry(podKey)
if entry == nil {
p.logger.Debug(fmt.Sprintf("no pod is found for %s, refresh the cache now...", podKey))
p.refresh(ctx, time.Now())
entry = p.getCachedEntry(podKey)
}
// If pod is still not found, insert a placeholder to avoid too many refresh
if entry == nil {
log.Printf("W! no pod is found after reading through kubelet, add a placeholder for %s", podKey)
p.setCachedEntry(podKey, &cachedEntry{creation: time.Now()})
return false
}
// If the entry is not a placeholder, decorate the pod
if entry.pod.Name == "" {
p.logger.Warn("no pod information is found in podstore for pod " + podKey)
return false
}
p.decorateCPU(metric, &entry.pod)
p.decorateMem(metric, &entry.pod)
p.addStatus(metric, &entry.pod)
addContainerCount(metric, &entry.pod)
addContainerID(&entry.pod, metric, kubernetesBlob, p.logger)
p.addPodOwnersAndPodName(metric, &entry.pod, kubernetesBlob)
addLabels(&entry.pod, kubernetesBlob)
}
return true
}
func (p *PodStore) getCachedEntry(podKey string) *cachedEntry {
p.Lock()
defer p.Unlock()
if content, ok := p.cache.Get(podKey); ok {
return content.(*cachedEntry)
}
return nil
}
func (p *PodStore) setCachedEntry(podKey string, entry *cachedEntry) {
p.Lock()
defer p.Unlock()
p.cache.Set(podKey, entry)
}
func (p *PodStore) refresh(ctx context.Context, now time.Time) {
var podList []corev1.Pod
var err error
doRefresh := func() {
podList, err = p.podClient.ListPods()
if err != nil {
p.logger.Error("fail to get pod from kubelet", zap.Error(err))
}
}
refreshWithTimeout(ctx, doRefresh, refreshInterval)
p.refreshInternal(now, podList)
}
func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) {
var podCount int
var containerCount int
var cpuRequest uint64
var memRequest uint64
for i := range podList {
pod := podList[i]
podKey := createPodKeyFromMetaData(&pod)
if podKey == "" {
p.logger.Warn("podKey is unavailable, refresh pod store for pod " + pod.Name)
continue
}
if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
tmpCPUReq, _ := getResourceSettingForPod(&pod, p.nodeInfo.getCPUCapacity(), cpuKey, getRequestForContainer)
cpuRequest += tmpCPUReq
tmpMemReq, _ := getResourceSettingForPod(&pod, p.nodeInfo.getMemCapacity(), memoryKey, getRequestForContainer)
memRequest += tmpMemReq
}
if pod.Status.Phase == corev1.PodRunning {
podCount++
}
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Running != nil {
containerCount++
}
}
p.setCachedEntry(podKey, &cachedEntry{
pod: pod,
creation: now,
})
}
p.nodeInfo.setNodeStats(nodeStats{podCnt: podCount, containerCnt: containerCount, memReq: memRequest, cpuReq: cpuRequest})
}
func (p *PodStore) decorateNode(metric CIMetric) {
nodeStats := p.nodeInfo.getNodeStats()
if metric.HasField(ci.MetricName(ci.TypeNode, ci.CPUTotal)) {
cpuLimitMetric := ci.MetricName(ci.TypeNode, ci.CPULimit)
if metric.HasField(cpuLimitMetric) {
p.nodeInfo.setCPUCapacity(metric.GetField(cpuLimitMetric))
}
metric.AddField(ci.MetricName(ci.TypeNode, ci.CPURequest), nodeStats.cpuReq)
if p.nodeInfo.getCPUCapacity() != 0 {
metric.AddField(ci.MetricName(ci.TypeNode, ci.CPUReservedCapacity),
float64(nodeStats.cpuReq)/float64(p.nodeInfo.getCPUCapacity())*100)
}
}
if metric.HasField(ci.MetricName(ci.TypeNode, ci.MemWorkingset)) {
memLimitMetric := ci.MetricName(ci.TypeNode, ci.MemLimit)
if metric.HasField(memLimitMetric) {
p.nodeInfo.setMemCapacity(metric.GetField(memLimitMetric))
}
metric.AddField(ci.MetricName(ci.TypeNode, ci.MemRequest), nodeStats.memReq)
if p.nodeInfo.getMemCapacity() != 0 {
metric.AddField(ci.MetricName(ci.TypeNode, ci.MemReservedCapacity),
float64(nodeStats.memReq)/float64(p.nodeInfo.getMemCapacity())*100)
}
}
metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningPodCount), nodeStats.podCnt)
metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningContainerCount), nodeStats.containerCnt)
}
func (p *PodStore) decorateCPU(metric CIMetric, pod *corev1.Pod) {
if metric.GetTag(ci.MetricType) == ci.TypePod {
// add cpu limit and request for pod cpu
if metric.HasField(ci.MetricName(ci.TypePod, ci.CPUTotal)) {
podCPUTotal := metric.GetField(ci.MetricName(ci.TypePod, ci.CPUTotal))
podCPUReq, _ := getResourceSettingForPod(pod, p.nodeInfo.getCPUCapacity(), cpuKey, getRequestForContainer)
// set podReq to the sum of containerReq which has req
if podCPUReq != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.CPURequest), podCPUReq)
}
if p.nodeInfo.getCPUCapacity() != 0 {
if podCPUReq != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUReservedCapacity), float64(podCPUReq)/float64(p.nodeInfo.getCPUCapacity())*100)
}
}
podCPULimit, ok := getResourceSettingForPod(pod, p.nodeInfo.getCPUCapacity(), cpuKey, getLimitForContainer)
// only set podLimit when all the containers has limit
if ok && podCPULimit != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.CPULimit), podCPULimit)
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilizationOverPodLimit), podCPUTotal.(float64)/float64(podCPULimit)*100)
}
}
} else if metric.GetTag(ci.MetricType) == ci.TypeContainer {
// add cpu limit and request for container
if metric.HasField(ci.MetricName(ci.TypeContainer, ci.CPUTotal)) {
if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" {
for _, containerSpec := range pod.Spec.Containers {
if containerSpec.Name == containerName {
if cpuLimit, ok := getLimitForContainer(cpuKey, containerSpec); ok {
metric.AddField(ci.MetricName(ci.TypeContainer, ci.CPULimit), cpuLimit)
}
if cpuReq, ok := getRequestForContainer(cpuKey, containerSpec); ok {
metric.AddField(ci.MetricName(ci.TypeContainer, ci.CPURequest), cpuReq)
}
}
}
}
}
}
}
func (p *PodStore) decorateMem(metric CIMetric, pod *corev1.Pod) {
if metric.GetTag(ci.MetricType) == ci.TypePod {
memWorkingsetMetric := ci.MetricName(ci.TypePod, ci.MemWorkingset)
if metric.HasField(memWorkingsetMetric) {
podMemWorkingset := metric.GetField(memWorkingsetMetric)
// add mem limit and request for pod mem
podMemReq, _ := getResourceSettingForPod(pod, p.nodeInfo.getMemCapacity(), memoryKey, getRequestForContainer)
// set podReq to the sum of containerReq which has req
if podMemReq != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.MemRequest), podMemReq)
}
if p.nodeInfo.getMemCapacity() != 0 {
if podMemReq != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), float64(podMemReq)/float64(p.nodeInfo.getMemCapacity())*100)
}
}
podMemLimit, ok := getResourceSettingForPod(pod, p.nodeInfo.getMemCapacity(), memoryKey, getLimitForContainer)
// only set podLimit when all the containers has limit
if ok && podMemLimit != 0 {
metric.AddField(ci.MetricName(ci.TypePod, ci.MemLimit), podMemLimit)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilizationOverPodLimit), float64(podMemWorkingset.(uint64))/float64(podMemLimit)*100)
}
}
} else if metric.GetTag(ci.MetricType) == ci.TypeContainer {
// add mem limit and request for container
if metric.HasField(ci.MetricName(ci.TypeContainer, ci.MemWorkingset)) {
if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" {
for _, containerSpec := range pod.Spec.Containers {
if containerSpec.Name == containerName {
if memLimit, ok := getLimitForContainer(memoryKey, containerSpec); ok {
metric.AddField(ci.MetricName(ci.TypeContainer, ci.MemLimit), memLimit)
}
if memReq, ok := getRequestForContainer(memoryKey, containerSpec); ok {
metric.AddField(ci.MetricName(ci.TypeContainer, ci.MemRequest), memReq)
}
}
}
}
}
}
}
func (p *PodStore) addStatus(metric CIMetric, pod *corev1.Pod) {
if metric.GetTag(ci.MetricType) == ci.TypePod {
metric.AddTag(ci.PodStatus, string(pod.Status.Phase))
var curContainerRestarts int
for _, containerStatus := range pod.Status.ContainerStatuses {
curContainerRestarts += int(containerStatus.RestartCount)
}
podKey := createPodKeyFromMetric(metric)
if podKey != "" {
content, ok := p.getPrevMeasurement(ci.TypePod, podKey)
if ok {
prevMeasurement := content.(prevPodMeasurement)
result := 0
if curContainerRestarts > prevMeasurement.containersRestarts {
result = curContainerRestarts - prevMeasurement.containersRestarts
}
metric.AddField(ci.MetricName(ci.TypePod, ci.ContainerRestartCount), result)
}
p.setPrevMeasurement(ci.TypePod, podKey, prevPodMeasurement{containersRestarts: curContainerRestarts})
}
} else if metric.GetTag(ci.MetricType) == ci.TypeContainer {
if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == containerName {
switch {
case containerStatus.State.Running != nil:
metric.AddTag(ci.ContainerStatus, "Running")
case containerStatus.State.Waiting != nil:
metric.AddTag(ci.ContainerStatus, "Waiting")
if containerStatus.State.Waiting.Reason != "" {
metric.AddTag(ci.ContainerStatusReason, containerStatus.State.Waiting.Reason)
}
case containerStatus.State.Terminated != nil:
metric.AddTag(ci.ContainerStatus, "Terminated")
if containerStatus.State.Terminated.Reason != "" {
metric.AddTag(ci.ContainerStatusReason, containerStatus.State.Terminated.Reason)
}
}
if containerStatus.LastTerminationState.Terminated != nil && containerStatus.LastTerminationState.Terminated.Reason != "" {
metric.AddTag(ci.ContainerLastTerminationReason, containerStatus.LastTerminationState.Terminated.Reason)
}
containerKey := createContainerKeyFromMetric(metric)
if containerKey != "" {
content, ok := p.getPrevMeasurement(ci.TypeContainer, containerKey)
if ok {
prevMeasurement := content.(prevContainerMeasurement)
result := 0
if int(containerStatus.RestartCount) > prevMeasurement.restarts {
result = int(containerStatus.RestartCount) - prevMeasurement.restarts
}
metric.AddField(ci.ContainerRestartCount, result)
}
p.setPrevMeasurement(ci.TypeContainer, containerKey, prevContainerMeasurement{restarts: int(containerStatus.RestartCount)})
}
}
}
}
}
}
// It could be used to get limit/request(depend on the passed-in fn) per pod
// return the sum of ResourceSetting and a bool which indicate whether all container set Resource
func getResourceSettingForPod(pod *corev1.Pod, bound uint64, resource corev1.ResourceName, fn func(resource corev1.ResourceName, spec corev1.Container) (uint64, bool)) (uint64, bool) {
var result uint64
allSet := true
for _, containerSpec := range pod.Spec.Containers {
val, ok := fn(resource, containerSpec)
if ok {
result += val
} else {
allSet = false
}
}
if bound != 0 && result > bound {
result = bound
}
return result, allSet
}
func getLimitForContainer(resource corev1.ResourceName, spec corev1.Container) (uint64, bool) {
if v, ok := spec.Resources.Limits[resource]; ok {
var limit int64
if resource == cpuKey {
limit = v.MilliValue()
} else {
limit = v.Value()
}
// it doesn't make sense for the limits to be negative
if limit < 0 {
return 0, false
}
return uint64(limit), true
}
return 0, false
}
func getRequestForContainer(resource corev1.ResourceName, spec corev1.Container) (uint64, bool) {
if v, ok := spec.Resources.Requests[resource]; ok {
var req int64
if resource == cpuKey {
req = v.MilliValue()
} else {
req = v.Value()
}
// it doesn't make sense for the requests to be negative
if req < 0 {
return 0, false
}
return uint64(req), true
}
return 0, false
}
func addContainerID(pod *corev1.Pod, metric CIMetric, kubernetesBlob map[string]any, logger *zap.Logger) {
if containerName := metric.GetTag(ci.ContainerNamekey); containerName != "" {
rawID := ""
for _, container := range pod.Status.ContainerStatuses {
if metric.GetTag(ci.ContainerNamekey) == container.Name {
rawID = container.ContainerID
if rawID != "" {
ids := strings.Split(rawID, "://")
if len(ids) == 2 {
kubernetesBlob[ids[0]] = map[string]string{"container_id": ids[1]}
} else {
logger.Warn(fmt.Sprintf("W! Cannot parse container id from %s for container %s", rawID, container.Name))
kubernetesBlob["container_id"] = rawID
}
}
break
}
}
if rawID == "" {
kubernetesBlob["container_id"] = metric.GetTag(ci.ContainerIDkey)
}
metric.RemoveTag(ci.ContainerIDkey)
}
}
func addLabels(pod *corev1.Pod, kubernetesBlob map[string]any) {
labels := make(map[string]string)
for k, v := range pod.Labels {
labels[k] = v
}
if len(labels) > 0 {
kubernetesBlob["labels"] = labels
}
}
func getJobNamePrefix(podName string) string {
return re.Split(podName, 2)[0]
}
func (p *PodStore) addPodOwnersAndPodName(metric CIMetric, pod *corev1.Pod, kubernetesBlob map[string]any) {
var owners []any
podName := ""
for _, owner := range pod.OwnerReferences {
if owner.Kind != "" && owner.Name != "" {
kind := owner.Kind
name := owner.Name
switch owner.Kind {
case ci.ReplicaSet:
replicaSetClient := p.k8sClient.GetReplicaSetClient()
rsToDeployment := replicaSetClient.ReplicaSetToDeployment()
if parent := rsToDeployment[owner.Name]; parent != "" {
kind = ci.Deployment
name = parent
} else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" {
kind = ci.Deployment
name = parent
}
case ci.Job:
if parent := parseCronJobFromJob(owner.Name); parent != "" {
kind = ci.CronJob
name = parent
} else if !p.prefFullPodName {
name = getJobNamePrefix(name)
}
}
owners = append(owners, map[string]string{"owner_kind": kind, "owner_name": name})
if podName == "" {
switch owner.Kind {
case ci.StatefulSet:
podName = pod.Name
case ci.DaemonSet, ci.Job, ci.ReplicaSet, ci.ReplicationController:
podName = name
}
}
}
}
if len(owners) > 0 {
kubernetesBlob["pod_owners"] = owners
}
// if podName is not set according to a well-known controllers, then set it to its own name
if podName == "" {
if strings.HasPrefix(pod.Name, kubeProxy) && !p.prefFullPodName {
podName = kubeProxy
} else {
podName = pod.Name
}
}
metric.AddTag(ci.PodNameKey, podName)
if p.addFullPodNameMetricLabel {
metric.AddTag(ci.FullPodNameKey, pod.Name)
}
}
func addContainerCount(metric CIMetric, pod *corev1.Pod) {
runningContainerCount := 0
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Running != nil {
runningContainerCount++
}
}
if metric.GetTag(ci.MetricType) == ci.TypePod {
metric.AddField(ci.MetricName(ci.TypePod, ci.RunningContainerCount), runningContainerCount)
metric.AddField(ci.MetricName(ci.TypePod, ci.ContainerCount), len(pod.Status.ContainerStatuses))
}
}