receiver/kubeletstatsreceiver/scraper.go (189 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kubeletstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver"
import (
"context"
"fmt"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/scraper"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)
type scraperOptions struct {
collectionInterval time.Duration
extraMetadataLabels []kubelet.MetadataLabel
metricGroupsToCollect map[kubelet.MetricGroup]bool
k8sAPIClient kubernetes.Interface
}
type kubeletScraper struct {
statsProvider *kubelet.StatsProvider
metadataProvider *kubelet.MetadataProvider
logger *zap.Logger
extraMetadataLabels []kubelet.MetadataLabel
metricGroupsToCollect map[kubelet.MetricGroup]bool
k8sAPIClient kubernetes.Interface
cachedVolumeSource map[string]v1.PersistentVolumeSource
mbs *metadata.MetricsBuilders
needsResources bool
nodeInformer cache.SharedInformer
stopCh chan struct{}
m sync.RWMutex
// A struct that keeps Node's information
nodeInfo *kubelet.NodeInfo
}
func newKubeletScraper(
restClient kubelet.RestClient,
set receiver.Settings,
rOptions *scraperOptions,
metricsConfig metadata.MetricsBuilderConfig,
nodeName string,
) (scraper.Metrics, error) {
ks := &kubeletScraper{
statsProvider: kubelet.NewStatsProvider(restClient),
metadataProvider: kubelet.NewMetadataProvider(restClient),
logger: set.Logger,
extraMetadataLabels: rOptions.extraMetadataLabels,
metricGroupsToCollect: rOptions.metricGroupsToCollect,
k8sAPIClient: rOptions.k8sAPIClient,
cachedVolumeSource: make(map[string]v1.PersistentVolumeSource),
mbs: &metadata.MetricsBuilders{
NodeMetricsBuilder: metadata.NewMetricsBuilder(metricsConfig, set),
PodMetricsBuilder: metadata.NewMetricsBuilder(metricsConfig, set),
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metricsConfig, set),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metricsConfig, set),
},
needsResources: metricsConfig.Metrics.K8sPodCPULimitUtilization.Enabled ||
metricsConfig.Metrics.K8sPodCPURequestUtilization.Enabled ||
metricsConfig.Metrics.K8sContainerCPULimitUtilization.Enabled ||
metricsConfig.Metrics.K8sContainerCPURequestUtilization.Enabled ||
metricsConfig.Metrics.K8sPodMemoryLimitUtilization.Enabled ||
metricsConfig.Metrics.K8sPodMemoryRequestUtilization.Enabled ||
metricsConfig.Metrics.K8sContainerMemoryLimitUtilization.Enabled ||
metricsConfig.Metrics.K8sContainerMemoryRequestUtilization.Enabled,
stopCh: make(chan struct{}),
nodeInfo: &kubelet.NodeInfo{},
}
if metricsConfig.Metrics.K8sContainerCPUNodeUtilization.Enabled ||
metricsConfig.Metrics.K8sPodCPUNodeUtilization.Enabled ||
metricsConfig.Metrics.K8sContainerMemoryNodeUtilization.Enabled ||
metricsConfig.Metrics.K8sPodMemoryNodeUtilization.Enabled {
ks.nodeInformer = k8sconfig.NewNodeSharedInformer(rOptions.k8sAPIClient, nodeName, 5*time.Minute)
}
return scraper.NewMetrics(
ks.scrape,
scraper.WithStart(ks.start),
scraper.WithShutdown(ks.shutdown),
)
}
func (r *kubeletScraper) scrape(context.Context) (pmetric.Metrics, error) {
summary, err := r.statsProvider.StatsSummary()
if err != nil {
r.logger.Error("call to /stats/summary endpoint failed", zap.Error(err))
return pmetric.Metrics{}, err
}
var podsMetadata *v1.PodList
// fetch metadata only when extra metadata labels are needed
if len(r.extraMetadataLabels) > 0 || r.needsResources {
podsMetadata, err = r.metadataProvider.Pods()
if err != nil {
r.logger.Error("call to /pods endpoint failed", zap.Error(err))
return pmetric.Metrics{}, err
}
}
var nodeInfo kubelet.NodeInfo
if r.nodeInformer != nil {
nodeInfo = r.node()
}
metaD := kubelet.NewMetadata(r.extraMetadataLabels, podsMetadata, nodeInfo, r.detailedPVCLabelsSetter())
mds := kubelet.MetricsData(r.logger, summary, metaD, r.metricGroupsToCollect, r.mbs)
md := pmetric.NewMetrics()
for i := range mds {
mds[i].ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
}
return md, nil
}
func (r *kubeletScraper) detailedPVCLabelsSetter() func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error {
return func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error {
if r.k8sAPIClient == nil {
return nil
}
if _, ok := r.cachedVolumeSource[volCacheID]; !ok {
ctx := context.Background()
pvc, err := r.k8sAPIClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, volumeClaim, metav1.GetOptions{})
if err != nil {
return err
}
volName := pvc.Spec.VolumeName
if volName == "" {
return fmt.Errorf("PersistentVolumeClaim %s does not have a volume name", pvc.Name)
}
pv, err := r.k8sAPIClient.CoreV1().PersistentVolumes().Get(ctx, volName, metav1.GetOptions{})
if err != nil {
return err
}
// Cache collected source.
r.cachedVolumeSource[volCacheID] = pv.Spec.PersistentVolumeSource
}
kubelet.SetPersistentVolumeLabels(rb, r.cachedVolumeSource[volCacheID])
return nil
}
}
func (r *kubeletScraper) node() kubelet.NodeInfo {
r.m.RLock()
defer r.m.RUnlock()
return *r.nodeInfo
}
func (r *kubeletScraper) start(_ context.Context, _ component.Host) error {
if r.nodeInformer != nil {
_, err := r.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: r.handleNodeAdd,
UpdateFunc: r.handleNodeUpdate,
})
if err != nil {
r.logger.Error("error adding event handler to node informer", zap.Error(err))
}
go r.nodeInformer.Run(r.stopCh)
}
return nil
}
func (r *kubeletScraper) shutdown(_ context.Context) error {
r.logger.Debug("executing close")
if r.stopCh != nil {
close(r.stopCh)
}
return nil
}
func (r *kubeletScraper) handleNodeAdd(obj any) {
if node, ok := obj.(*v1.Node); ok {
r.addOrUpdateNode(node)
} else {
r.logger.Error("object received was not of type v1.Node", zap.Any("received", obj))
}
}
func (r *kubeletScraper) handleNodeUpdate(_, newNode any) {
if node, ok := newNode.(*v1.Node); ok {
r.addOrUpdateNode(node)
} else {
r.logger.Error("object received was not of type v1.Node", zap.Any("received", newNode))
}
}
func (r *kubeletScraper) addOrUpdateNode(node *v1.Node) {
r.m.Lock()
defer r.m.Unlock()
if cpu, ok := node.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
r.nodeInfo.CPUCapacity = float64(q.MilliValue()) / 1000
}
}
if memory, ok := node.Status.Capacity["memory"]; ok {
// ie: 32564740Ki
if q, err := resource.ParseQuantity(memory.String()); err == nil {
r.nodeInfo.MemoryCapacity = float64(q.Value())
}
}
}