receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go (116 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
import (
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)
type MetricGroup string
// Values for MetricGroup enum.
const (
ContainerMetricGroup = MetricGroup("container")
PodMetricGroup = MetricGroup("pod")
NodeMetricGroup = MetricGroup("node")
VolumeMetricGroup = MetricGroup("volume")
)
// ValidMetricGroups map of valid metrics.
var ValidMetricGroups = map[MetricGroup]bool{
ContainerMetricGroup: true,
PodMetricGroup: true,
NodeMetricGroup: true,
VolumeMetricGroup: true,
}
type metricDataAccumulator struct {
m []pmetric.Metrics
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
}
func addUptimeMetric(mb *metadata.MetricsBuilder, uptimeMetric metadata.RecordIntDataPointFunc, startTime v1.Time, currentTime pcommon.Timestamp) {
if !startTime.IsZero() {
value := int64(time.Since(startTime.Time).Seconds())
uptimeMetric(mb, currentTime, value)
}
}
func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
if !a.metricGroupsToCollect[NodeMetricGroup] {
return
}
currentTime := pcommon.NewTimestampFromTime(a.time)
addUptimeMetric(a.mbs.NodeMetricsBuilder, metadata.NodeUptimeMetrics.Uptime, s.StartTime, currentTime)
addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime, resources{}, 0)
addMemoryMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeMemoryMetrics, s.Memory, currentTime, resources{}, 0)
addFilesystemMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
// todo s.Runtime.ImageFs
rb := a.mbs.NodeMetricsBuilder.NewResourceBuilder()
rb.SetK8sNodeName(s.NodeName)
a.m = append(a.m, a.mbs.NodeMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
metadata.WithResource(rb.Emit()),
))
}
func (a *metricDataAccumulator) podStats(s stats.PodStats) {
if !a.metricGroupsToCollect[PodMetricGroup] {
return
}
currentTime := pcommon.NewTimestampFromTime(a.time)
addUptimeMetric(a.mbs.PodMetricsBuilder, metadata.PodUptimeMetrics.Uptime, s.StartTime, currentTime)
addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime, a.metadata.podResources[s.PodRef.UID], a.metadata.nodeInfo.CPUCapacity)
addMemoryMetrics(a.mbs.PodMetricsBuilder, metadata.PodMemoryMetrics, s.Memory, currentTime, a.metadata.podResources[s.PodRef.UID], a.metadata.nodeInfo.MemoryCapacity)
addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)
rb := a.mbs.PodMetricsBuilder.NewResourceBuilder()
rb.SetK8sPodUID(s.PodRef.UID)
rb.SetK8sPodName(s.PodRef.Name)
rb.SetK8sNamespaceName(s.PodRef.Namespace)
a.m = append(a.m, a.mbs.PodMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
metadata.WithResource(rb.Emit()),
))
}
func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.ContainerStats) {
if !a.metricGroupsToCollect[ContainerMetricGroup] {
return
}
rb := a.mbs.ContainerMetricsBuilder.NewResourceBuilder()
res, err := getContainerResource(rb, sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"failed to fetch container metrics",
zap.String("pod", sPod.PodRef.Name),
zap.String("container", s.Name),
zap.Error(err))
return
}
currentTime := pcommon.NewTimestampFromTime(a.time)
resourceKey := sPod.PodRef.UID + s.Name
addUptimeMetric(a.mbs.ContainerMetricsBuilder, metadata.ContainerUptimeMetrics.Uptime, s.StartTime, currentTime)
addCPUMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerCPUMetrics, s.CPU, currentTime, a.metadata.containerResources[resourceKey], a.metadata.nodeInfo.CPUCapacity)
addMemoryMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerMemoryMetrics, s.Memory, currentTime, a.metadata.containerResources[resourceKey], a.metadata.nodeInfo.MemoryCapacity)
addFilesystemMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime)
a.m = append(a.m, a.mbs.ContainerMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
metadata.WithResource(res),
))
}
func (a *metricDataAccumulator) volumeStats(sPod stats.PodStats, s stats.VolumeStats) {
if !a.metricGroupsToCollect[VolumeMetricGroup] {
return
}
rb := a.mbs.OtherMetricsBuilder.NewResourceBuilder()
res, err := getVolumeResourceOptions(rb, sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"Failed to gather additional volume metadata. Skipping metric collection.",
zap.String("pod", sPod.PodRef.Name),
zap.String("volume", s.Name),
zap.Error(err))
return
}
currentTime := pcommon.NewTimestampFromTime(a.time)
addVolumeMetrics(a.mbs.OtherMetricsBuilder, metadata.K8sVolumeMetrics, s, currentTime)
a.m = append(a.m, a.mbs.OtherMetricsBuilder.Emit(metadata.WithResource(res)))
}