receiver/kubeletstatsreceiver/internal/kubelet/metadata.go (192 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
import (
"errors"
"fmt"
"regexp"
"strings"
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)
type MetadataLabel string
// Values for MetadataLabel enum.
const (
MetadataLabelContainerID MetadataLabel = conventions.AttributeContainerID
MetadataLabelVolumeType MetadataLabel = labelVolumeType
)
var supportedLabels = map[MetadataLabel]bool{
MetadataLabelContainerID: true,
MetadataLabelVolumeType: true,
}
// ValidateMetadataLabelsConfig validates that provided list of metadata labels is supported
func ValidateMetadataLabelsConfig(labels []MetadataLabel) error {
labelsFound := map[MetadataLabel]bool{}
for _, label := range labels {
_, supported := supportedLabels[label]
if !supported {
return fmt.Errorf("label %q is not supported", label)
}
if _, duplicate := labelsFound[label]; duplicate {
return fmt.Errorf("duplicate metadata label: %q", label)
}
labelsFound[label] = true
}
return nil
}
type Metadata struct {
Labels map[MetadataLabel]bool
PodsMetadata *v1.PodList
DetailedPVCResourceSetter func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error
podResources map[string]resources
containerResources map[string]resources
nodeInfo NodeInfo
}
type resources struct {
cpuRequest float64
cpuLimit float64
memoryRequest int64
memoryLimit int64
}
type NodeInfo struct {
Name string
// node's CPU capacity in cores
CPUCapacity float64
// node's Memory capacity in bytes
MemoryCapacity float64
}
func getContainerResources(r *v1.ResourceRequirements) resources {
if r == nil {
return resources{}
}
return resources{
cpuRequest: r.Requests.Cpu().AsApproximateFloat64(),
cpuLimit: r.Limits.Cpu().AsApproximateFloat64(),
memoryRequest: r.Requests.Memory().Value(),
memoryLimit: r.Limits.Memory().Value(),
}
}
func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList, nodeInfo NodeInfo,
detailedPVCResourceSetter func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error,
) Metadata {
m := Metadata{
Labels: getLabelsMap(labels),
PodsMetadata: podsMetadata,
DetailedPVCResourceSetter: detailedPVCResourceSetter,
podResources: make(map[string]resources),
containerResources: make(map[string]resources),
nodeInfo: nodeInfo,
}
if podsMetadata != nil {
for _, pod := range podsMetadata.Items {
var podResource resources
allContainersCPULimitsDefined := true
allContainersCPURequestsDefined := true
allContainersMemoryLimitsDefined := true
allContainersMemoryRequestsDefined := true
for i := range pod.Spec.Containers {
container := pod.Spec.Containers[i]
containerResource := getContainerResources(&container.Resources)
if allContainersCPULimitsDefined && containerResource.cpuLimit == 0 {
allContainersCPULimitsDefined = false
podResource.cpuLimit = 0
}
if allContainersCPURequestsDefined && containerResource.cpuRequest == 0 {
allContainersCPURequestsDefined = false
podResource.cpuRequest = 0
}
if allContainersMemoryLimitsDefined && containerResource.memoryLimit == 0 {
allContainersMemoryLimitsDefined = false
podResource.memoryLimit = 0
}
if allContainersMemoryRequestsDefined && containerResource.memoryRequest == 0 {
allContainersMemoryRequestsDefined = false
podResource.memoryRequest = 0
}
if allContainersCPULimitsDefined {
podResource.cpuLimit += containerResource.cpuLimit
}
if allContainersCPURequestsDefined {
podResource.cpuRequest += containerResource.cpuRequest
}
if allContainersMemoryLimitsDefined {
podResource.memoryLimit += containerResource.memoryLimit
}
if allContainersMemoryRequestsDefined {
podResource.memoryRequest += containerResource.memoryRequest
}
m.containerResources[string(pod.UID)+container.Name] = containerResource
}
m.podResources[string(pod.UID)] = podResource
}
}
return m
}
func getLabelsMap(metadataLabels []MetadataLabel) map[MetadataLabel]bool {
out := make(map[MetadataLabel]bool, len(metadataLabels))
for _, l := range metadataLabels {
out[l] = true
}
return out
}
// getExtraResources gets extra resources based on provided metadata label.
func (m *Metadata) setExtraResources(rb *metadata.ResourceBuilder, podRef stats.PodReference,
extraMetadataLabel MetadataLabel, extraMetadataFrom string,
) error {
// Ensure MetadataLabel exists before proceeding.
if !m.Labels[extraMetadataLabel] || len(m.Labels) == 0 {
return nil
}
// Cannot proceed, if metadata is unavailable.
if m.PodsMetadata == nil {
return errors.New("pods metadata were not fetched")
}
switch extraMetadataLabel {
case MetadataLabelContainerID:
containerID, err := m.getContainerID(podRef.UID, extraMetadataFrom)
if err != nil {
return err
}
rb.SetContainerID(containerID)
case MetadataLabelVolumeType:
volume, err := m.getPodVolume(podRef.UID, extraMetadataFrom)
if err != nil {
return err
}
setResourcesFromVolume(rb, volume)
// Get more labels from PersistentVolumeClaim volume type.
if volume.PersistentVolumeClaim != nil {
volCacheID := fmt.Sprintf("%s/%s", podRef.UID, extraMetadataFrom)
err := m.DetailedPVCResourceSetter(rb, volCacheID, volume.PersistentVolumeClaim.ClaimName, podRef.Namespace)
if err != nil {
return fmt.Errorf("failed to set labels from volume claim: %w", err)
}
}
}
return nil
}
// getContainerID retrieves container id from metadata for given pod UID and container name,
// returns an error if no container found in the metadata that matches the requirements
// or if the apiServer returned a newly created container with empty containerID.
func (m *Metadata) getContainerID(podUID string, containerName string) (string, error) {
uid := types.UID(podUID)
for _, pod := range m.PodsMetadata.Items {
if pod.UID == uid {
for _, containerStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
if containerName == containerStatus.Name {
if len(strings.TrimSpace(containerStatus.ContainerID)) == 0 {
return "", fmt.Errorf("pod %q with container %q has an empty containerID", podUID, containerName)
}
return stripContainerID(containerStatus.ContainerID), nil
}
}
}
}
return "", fmt.Errorf("pod %q with container %q not found in the fetched metadata", podUID, containerName)
}
var containerSchemeRegexp = regexp.MustCompile(`^[\w_-]+://`)
// stripContainerID returns a pure container id without the runtime scheme://
func stripContainerID(id string) string {
return containerSchemeRegexp.ReplaceAllString(id, "")
}
func (m *Metadata) getPodVolume(podUID string, volumeName string) (v1.Volume, error) {
for _, pod := range m.PodsMetadata.Items {
if pod.UID == types.UID(podUID) {
for _, volume := range pod.Spec.Volumes {
if volumeName == volume.Name {
return volume, nil
}
}
}
}
return v1.Volume{}, fmt.Errorf("pod %q with volume %q not found in the fetched metadata", podUID, volumeName)
}