processor/k8sattributesprocessor/internal/kube/client.go (935 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package kube // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" import ( "context" "errors" "fmt" "regexp" "strings" "sync" "time" "go.opentelemetry.io/collector/component" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" dcommon "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/docker" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" ) // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { m sync.RWMutex deleteMut sync.Mutex logger *zap.Logger kc kubernetes.Interface informer cache.SharedInformer namespaceInformer cache.SharedInformer nodeInformer cache.SharedInformer replicasetInformer cache.SharedInformer replicasetRegex *regexp.Regexp cronJobRegex *regexp.Regexp deleteQueue []deleteRequest stopCh chan struct{} waitForMetadata bool waitForMetadataTimeout time.Duration // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID Pods map[PodIdentifier]*Pod Rules ExtractionRules Filters Filters Associations []Association Exclude Excludes // A map containing Namespace related data, used to associate them with resources. // Key is namespace name Namespaces map[string]*Namespace // A map containing Node related data, used to associate them with resources. // Key is node name Nodes map[string]*Node // A map containing ReplicaSets related data, used to associate them with resources. // Key is replicaset uid ReplicaSets map[string]*ReplicaSet telemetryBuilder *metadata.TelemetryBuilder } // Extract replicaset name from the pod name. Pod name is created using // format: [deployment-name]-[Random-String-For-ReplicaSet] var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`) // Extract CronJob name from the Job name. Job name is created using // format: [cronjob-name]-[time-hash-int] var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. func New( set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet, waitForMetadata bool, waitForMetadataTimeout time.Duration, ) (Client, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err } c := &WatchClient{ logger: set.Logger, Rules: rules, Filters: filters, Associations: associations, Exclude: exclude, replicasetRegex: rRegex, cronJobRegex: cronJobRegex, stopCh: make(chan struct{}), telemetryBuilder: telemetryBuilder, waitForMetadata: waitForMetadata, waitForMetadataTimeout: waitForMetadataTimeout, } go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod) c.Pods = map[PodIdentifier]*Pod{} c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient } kc, err := newClientSet(apiCfg) if err != nil { return nil, err } c.kc = kc labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters) if err != nil { return nil, err } set.Logger.Info( "k8s filtering", zap.String("labelSelector", labelSelector.String()), zap.String("fieldSelector", fieldSelector.String()), ) if newInformer == nil { newInformer = newSharedInformer } if newNamespaceInformer == nil { switch { case c.extractNamespaceLabelsAnnotations(): // if rules to extract metadata from namespace is configured use namespace shared informer containing // all namespaces including kube-system which contains cluster uid information (kube-system-uid) newNamespaceInformer = newNamespaceSharedInformer case rules.ClusterUID: // use kube-system shared informer to only watch kube-system namespace // reducing overhead of watching all the namespaces newNamespaceInformer = newKubeSystemSharedInformer default: newNamespaceInformer = NewNoOpInformer } } c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector) err = c.informer.SetTransform( func(object any) (any, error) { originalPod, success := object.(*api_v1.Pod) if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing return object, nil } return removeUnnecessaryPodData(originalPod, c.Rules), nil }, ) if err != nil { return nil, err } c.namespaceInformer = newNamespaceInformer(c.kc) if rules.DeploymentName || rules.DeploymentUID { if newReplicaSetInformer == nil { newReplicaSetInformer = newReplicaSetSharedInformer } c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace) err = c.replicasetInformer.SetTransform( func(object any) (any, error) { originalReplicaset, success := object.(*apps_v1.ReplicaSet) if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing return object, nil } return removeUnnecessaryReplicaSetData(originalReplicaset), nil }, ) if err != nil { return nil, err } } if c.extractNodeLabelsAnnotations() || c.extractNodeUID() { c.nodeInformer = k8sconfig.NewNodeSharedInformer(c.kc, c.Filters.Node, 5*time.Minute) } return c, err } // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) // start the replicaSet informer first, as the replica sets need to be // present at the time the pods are handled, to correctly establish the connection between pods and deployments if c.Rules.DeploymentName || c.Rules.DeploymentUID { reg, err := c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, DeleteFunc: c.handleReplicaSetDelete, }) if err != nil { return err } synced = append(synced, reg.HasSynced) go c.replicasetInformer.Run(c.stopCh) } reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, }) if err != nil { return err } synced = append(synced, reg.HasSynced) go c.namespaceInformer.Run(c.stopCh) if c.nodeInformer != nil { reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, }) if err != nil { return err } synced = append(synced, reg.HasSynced) go c.nodeInformer.Run(c.stopCh) } reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, DeleteFunc: c.handlePodDelete, }) if err != nil { return err } // start the podInformer with the prerequisite of the other informers to be finished first go c.runInformerWithDependencies(c.informer, synced) if c.waitForMetadata { timeoutCh := make(chan struct{}) t := time.AfterFunc(c.waitForMetadataTimeout, func() { close(timeoutCh) }) defer t.Stop() // Wait for the Pod informer to be completed. // The other informers will already be finished at this point, as the pod informer // waits for them be finished before it can run if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) { return errors.New("failed to wait for caches to sync") } } return nil } // Stop signals the k8s watcher/informer to stop watching for new events. func (c *WatchClient) Stop() { close(c.stopCh) } func (c *WatchClient) handlePodAdd(obj any) { c.telemetryBuilder.OtelsvcK8sPodAdded.Add(context.Background(), 1) if pod, ok := obj.(*api_v1.Pod); ok { c.addOrUpdatePod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) } podTableSize := len(c.Pods) c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } func (c *WatchClient) handlePodUpdate(_, newPod any) { c.telemetryBuilder.OtelsvcK8sPodUpdated.Add(context.Background(), 1) if pod, ok := newPod.(*api_v1.Pod); ok { // TODO: update or remove based on whether container is ready/unready?. c.addOrUpdatePod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", newPod)) } podTableSize := len(c.Pods) c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } func (c *WatchClient) handlePodDelete(obj any) { c.telemetryBuilder.OtelsvcK8sPodDeleted.Add(context.Background(), 1) if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok { c.forgetPod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) } podTableSize := len(c.Pods) c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } func (c *WatchClient) handleNamespaceAdd(obj any) { c.telemetryBuilder.OtelsvcK8sNamespaceAdded.Add(context.Background(), 1) if namespace, ok := obj.(*api_v1.Namespace); ok { c.addOrUpdateNamespace(namespace) } else { c.logger.Error("object received was not of type api_v1.Namespace", zap.Any("received", obj)) } } func (c *WatchClient) handleNamespaceUpdate(_, newNamespace any) { c.telemetryBuilder.OtelsvcK8sNamespaceUpdated.Add(context.Background(), 1) if namespace, ok := newNamespace.(*api_v1.Namespace); ok { c.addOrUpdateNamespace(namespace) } else { c.logger.Error("object received was not of type api_v1.Namespace", zap.Any("received", newNamespace)) } } func (c *WatchClient) handleNamespaceDelete(obj any) { c.telemetryBuilder.OtelsvcK8sNamespaceDeleted.Add(context.Background(), 1) if namespace, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Namespace); ok { c.m.Lock() if ns, ok := c.Namespaces[namespace.Name]; ok { // When a namespace is deleted all the pods(and other k8s objects in that namespace) in that namespace are deleted before it. // So we wont have any spans that might need namespace annotations and labels. // Thats why we dont need an implementation for deleteQueue and gracePeriod for namespaces. delete(c.Namespaces, ns.Name) } c.m.Unlock() } else { c.logger.Error("object received was not of type api_v1.Namespace", zap.Any("received", obj)) } } func (c *WatchClient) handleNodeAdd(obj any) { c.telemetryBuilder.OtelsvcK8sNodeAdded.Add(context.Background(), 1) if node, ok := obj.(*api_v1.Node); ok { c.addOrUpdateNode(node) } else { c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj)) } } func (c *WatchClient) handleNodeUpdate(_, newNode any) { c.telemetryBuilder.OtelsvcK8sNodeUpdated.Add(context.Background(), 1) if node, ok := newNode.(*api_v1.Node); ok { c.addOrUpdateNode(node) } else { c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", newNode)) } } func (c *WatchClient) handleNodeDelete(obj any) { c.telemetryBuilder.OtelsvcK8sNodeDeleted.Add(context.Background(), 1) if node, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Node); ok { c.m.Lock() if n, ok := c.Nodes[node.Name]; ok { delete(c.Nodes, n.Name) } c.m.Unlock() } else { c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj)) } } func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) { // This loop runs after N seconds and deletes pods from cache. // It iterates over the delete queue and deletes all that aren't // in the grace period anymore. for { select { case <-time.After(interval): var cutoff int now := time.Now() c.deleteMut.Lock() for i, d := range c.deleteQueue { if d.ts.Add(gracePeriod).After(now) { break } cutoff = i + 1 } toDelete := c.deleteQueue[:cutoff] c.deleteQueue = c.deleteQueue[cutoff:] c.deleteMut.Unlock() c.m.Lock() for _, d := range toDelete { if p, ok := c.Pods[d.id]; ok { // Sanity check: make sure we are deleting the same pod // and the underlying state (ip<>pod mapping) has not changed. if p.Name == d.podName { delete(c.Pods, d.id) } } } podTableSize := len(c.Pods) c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) c.m.Unlock() case <-c.stopCh: return } } } // GetPod takes an IP address or Pod UID and returns the pod the identifier is associated with. func (c *WatchClient) GetPod(identifier PodIdentifier) (*Pod, bool) { c.m.RLock() pod, ok := c.Pods[identifier] c.m.RUnlock() if ok { if pod.Ignore { return nil, false } return pod, ok } c.telemetryBuilder.OtelsvcK8sIPLookupMiss.Add(context.Background(), 1) return nil, false } // GetNamespace takes a namespace and returns the namespace object the namespace is associated with. func (c *WatchClient) GetNamespace(namespace string) (*Namespace, bool) { c.m.RLock() ns, ok := c.Namespaces[namespace] c.m.RUnlock() if ok { return ns, ok } return nil, false } // GetNode takes a node name and returns the node object the node name is associated with. func (c *WatchClient) GetNode(nodeName string) (*Node, bool) { c.m.RLock() node, ok := c.Nodes[nodeName] c.m.RUnlock() if ok { return node, ok } return nil, false } func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags := map[string]string{} if c.Rules.PodName { tags[conventions.AttributeK8SPodName] = pod.Name } if c.Rules.PodHostName { tags[tagHostName] = pod.Spec.Hostname } if c.Rules.PodIP { tags[K8sIPLabelName] = pod.Status.PodIP } if c.Rules.Namespace { tags[conventions.AttributeK8SNamespaceName] = pod.GetNamespace() } if c.Rules.StartTime { ts := pod.GetCreationTimestamp() if !ts.IsZero() { if rfc3339ts, err := ts.MarshalText(); err != nil { c.logger.Error("failed to unmarshal pod creation timestamp", zap.Error(err)) } else { tags[tagStartTime] = string(rfc3339ts) } } } if c.Rules.PodUID { uid := pod.GetUID() tags[conventions.AttributeK8SPodUID] = string(uid) } if c.Rules.ReplicaSetID || c.Rules.ReplicaSetName || c.Rules.DaemonSetUID || c.Rules.DaemonSetName || c.Rules.JobUID || c.Rules.JobName || c.Rules.StatefulSetUID || c.Rules.StatefulSetName || c.Rules.DeploymentName || c.Rules.DeploymentUID || c.Rules.CronJobName { for _, ref := range pod.OwnerReferences { switch ref.Kind { case "ReplicaSet": if c.Rules.ReplicaSetID { tags[conventions.AttributeK8SReplicaSetUID] = string(ref.UID) } if c.Rules.ReplicaSetName { tags[conventions.AttributeK8SReplicaSetName] = ref.Name } if c.Rules.DeploymentName { if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok { if replicaset.Deployment.Name != "" { tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name } } } if c.Rules.DeploymentUID { if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok { if replicaset.Deployment.Name != "" { tags[conventions.AttributeK8SDeploymentUID] = replicaset.Deployment.UID } } } case "DaemonSet": if c.Rules.DaemonSetUID { tags[conventions.AttributeK8SDaemonSetUID] = string(ref.UID) } if c.Rules.DaemonSetName { tags[conventions.AttributeK8SDaemonSetName] = ref.Name } case "StatefulSet": if c.Rules.StatefulSetUID { tags[conventions.AttributeK8SStatefulSetUID] = string(ref.UID) } if c.Rules.StatefulSetName { tags[conventions.AttributeK8SStatefulSetName] = ref.Name } case "Job": if c.Rules.CronJobName { parts := c.cronJobRegex.FindStringSubmatch(ref.Name) if len(parts) == 2 { tags[conventions.AttributeK8SCronJobName] = parts[1] } } if c.Rules.JobUID { tags[conventions.AttributeK8SJobUID] = string(ref.UID) } if c.Rules.JobName { tags[conventions.AttributeK8SJobName] = ref.Name } } } } if c.Rules.Node { tags[tagNodeName] = pod.Spec.NodeName } if c.Rules.ClusterUID { if val, ok := c.Namespaces["kube-system"]; ok { tags[tagClusterUID] = val.NamespaceUID } else { c.logger.Debug("unable to find kube-system namespace, cluster uid will not be available") } } for _, r := range c.Rules.Labels { r.extractFromPodMetadata(pod.Labels, tags, "k8s.pod.labels.%s") } for _, r := range c.Rules.Annotations { r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s") } return tags } // This function removes all data from the Pod except what is required by extraction rules and pod association func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Pod { // name, namespace, uid, start time and ip are needed for identifying Pods // there's room to optimize this further, it's kept this way for simplicity transformedPod := api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ Name: pod.GetName(), Namespace: pod.GetNamespace(), UID: pod.GetUID(), }, Status: api_v1.PodStatus{ PodIP: pod.Status.PodIP, StartTime: pod.Status.StartTime, }, Spec: api_v1.PodSpec{ HostNetwork: pod.Spec.HostNetwork, }, } if rules.StartTime { transformedPod.SetCreationTimestamp(pod.GetCreationTimestamp()) } if rules.PodUID { transformedPod.SetUID(pod.GetUID()) } if rules.Node { transformedPod.Spec.NodeName = pod.Spec.NodeName } if rules.PodHostName { transformedPod.Spec.Hostname = pod.Spec.Hostname } if needContainerAttributes(rules) { removeUnnecessaryContainerStatus := func(c api_v1.ContainerStatus) api_v1.ContainerStatus { transformedContainerStatus := api_v1.ContainerStatus{ Name: c.Name, ContainerID: c.ContainerID, RestartCount: c.RestartCount, } if rules.ContainerImageRepoDigests { transformedContainerStatus.ImageID = c.ImageID } return transformedContainerStatus } for _, containerStatus := range pod.Status.ContainerStatuses { transformedPod.Status.ContainerStatuses = append( transformedPod.Status.ContainerStatuses, removeUnnecessaryContainerStatus(containerStatus), ) } for _, containerStatus := range pod.Status.InitContainerStatuses { transformedPod.Status.InitContainerStatuses = append( transformedPod.Status.InitContainerStatuses, removeUnnecessaryContainerStatus(containerStatus), ) } removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container { transformedContainer := api_v1.Container{} transformedContainer.Name = c.Name // we always need the name, it's used for identification if rules.ContainerImageName || rules.ContainerImageTag { transformedContainer.Image = c.Image } return transformedContainer } for _, container := range pod.Spec.Containers { transformedPod.Spec.Containers = append( transformedPod.Spec.Containers, removeUnnecessaryContainerData(container), ) } for _, container := range pod.Spec.InitContainers { transformedPod.Spec.InitContainers = append( transformedPod.Spec.InitContainers, removeUnnecessaryContainerData(container), ) } } if len(rules.Labels) > 0 { transformedPod.Labels = pod.Labels } if len(rules.Annotations) > 0 { transformedPod.Annotations = pod.Annotations } if rules.IncludesOwnerMetadata() { transformedPod.SetOwnerReferences(pod.GetOwnerReferences()) } return &transformedPod } func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers { containers := PodContainers{ ByID: map[string]*Container{}, ByName: map[string]*Container{}, } if !needContainerAttributes(c.Rules) { return containers } if c.Rules.ContainerImageName || c.Rules.ContainerImageTag { for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { container := &Container{} imageRef, err := dcommon.ParseImageName(spec.Image) if err == nil { if c.Rules.ContainerImageName { container.ImageName = imageRef.Repository } if c.Rules.ContainerImageTag { container.ImageTag = imageRef.Tag } } containers.ByName[spec.Name] = container } } for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { container, ok := containers.ByName[apiStatus.Name] if !ok { container = &Container{} containers.ByName[apiStatus.Name] = container } if c.Rules.ContainerName { container.Name = apiStatus.Name } containerID := apiStatus.ContainerID // Remove container runtime prefix parts := strings.Split(containerID, "://") if len(parts) == 2 { containerID = parts[1] } containers.ByID[containerID] = container if c.Rules.ContainerID || c.Rules.ContainerImageRepoDigests { if container.Statuses == nil { container.Statuses = map[int]ContainerStatus{} } containerStatus := ContainerStatus{} if c.Rules.ContainerID { containerStatus.ContainerID = containerID } if c.Rules.ContainerImageRepoDigests { if canonicalRef, err := dcommon.CanonicalImageRef(apiStatus.ImageID); err == nil { containerStatus.ImageRepoDigest = canonicalRef } } container.Statuses[int(apiStatus.RestartCount)] = containerStatus } } return containers } func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) map[string]string { tags := map[string]string{} for _, r := range c.Rules.Labels { r.extractFromNamespaceMetadata(namespace.Labels, tags, "k8s.namespace.labels.%s") } for _, r := range c.Rules.Annotations { r.extractFromNamespaceMetadata(namespace.Annotations, tags, "k8s.namespace.annotations.%s") } return tags } func (c *WatchClient) extractNodeAttributes(node *api_v1.Node) map[string]string { tags := map[string]string{} for _, r := range c.Rules.Labels { r.extractFromNodeMetadata(node.Labels, tags, "k8s.node.labels.%s") } for _, r := range c.Rules.Annotations { r.extractFromNodeMetadata(node.Annotations, tags, "k8s.node.annotations.%s") } return tags } func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod { newPod := &Pod{ Name: pod.Name, Namespace: pod.GetNamespace(), NodeName: pod.Spec.NodeName, Address: pod.Status.PodIP, HostNetwork: pod.Spec.HostNetwork, PodUID: string(pod.UID), StartTime: pod.Status.StartTime, } if c.shouldIgnorePod(pod) { newPod.Ignore = true } else { newPod.Attributes = c.extractPodAttributes(pod) if needContainerAttributes(c.Rules) { newPod.Containers = c.extractPodContainersAttributes(pod) } } return newPod } // getIdentifiersFromAssoc returns list of PodIdentifiers for given pod func (c *WatchClient) getIdentifiersFromAssoc(pod *Pod) []PodIdentifier { var ids []PodIdentifier for _, assoc := range c.Associations { ret := PodIdentifier{} skip := false for i, source := range assoc.Sources { // If association configured to take IP address from connection switch source.From { case ConnectionSource: if pod.Address == "" { skip = true break } // Host network mode is not supported right now with IP based // tagging as all pods in host network get same IP addresses. // Such pods are very rare and usually are used to monitor or control // host traffic (e.g, linkerd, flannel) instead of service business needs. if pod.HostNetwork { skip = true break } ret[i] = PodIdentifierAttributeFromSource(source, pod.Address) case ResourceSource: attr := "" switch source.Name { case conventions.AttributeK8SNamespaceName: attr = pod.Namespace case conventions.AttributeK8SPodName: attr = pod.Name case conventions.AttributeK8SPodUID: attr = pod.PodUID case conventions.AttributeHostName: attr = pod.Address // k8s.pod.ip is set by passthrough mode case K8sIPLabelName: attr = pod.Address default: if v, ok := pod.Attributes[source.Name]; ok { attr = v } } if attr == "" { skip = true break } ret[i] = PodIdentifierAttributeFromSource(source, attr) } } if !skip { ids = append(ids, ret) } } // Ensure backward compatibility if pod.PodUID != "" { ids = append(ids, PodIdentifier{ PodIdentifierAttributeFromResourceAttribute(conventions.AttributeK8SPodUID, pod.PodUID), }) } if pod.Address != "" && !pod.HostNetwork { ids = append(ids, PodIdentifier{ PodIdentifierAttributeFromConnection(pod.Address), }) // k8s.pod.ip is set by passthrough mode ids = append(ids, PodIdentifier{ PodIdentifierAttributeFromResourceAttribute(K8sIPLabelName, pod.Address), }) } return ids } func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { newPod := c.podFromAPI(pod) c.m.Lock() defer c.m.Unlock() for _, id := range c.getIdentifiersFromAssoc(newPod) { // compare initial scheduled timestamp for existing pod and new pod with same identifier // and only replace old pod if scheduled time of new pod is newer or equal. // This should fix the case where scheduler has assigned the same attributes (like IP address) // to a new pod but update event for the old pod came in later. if p, ok := c.Pods[id]; ok { if pod.Status.StartTime.Before(p.StartTime) { continue } } c.Pods[id] = newPod } } func (c *WatchClient) forgetPod(pod *api_v1.Pod) { podToRemove := c.podFromAPI(pod) for _, id := range c.getIdentifiersFromAssoc(podToRemove) { p, ok := c.GetPod(id) if ok && p.Name == pod.Name { c.appendDeleteQueue(id, pod.Name) } } } func (c *WatchClient) appendDeleteQueue(podID PodIdentifier, podName string) { c.deleteMut.Lock() c.deleteQueue = append(c.deleteQueue, deleteRequest{ id: podID, podName: podName, ts: time.Now(), }) c.deleteMut.Unlock() } func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool { // Check if user requested the pod to be ignored through annotations if v, ok := pod.Annotations[ignoreAnnotation]; ok { if strings.ToLower(strings.TrimSpace(v)) == "true" { return true } } // Check if user requested the pod to be ignored through configuration for _, excludedPod := range c.Exclude.Pods { if excludedPod.Name.MatchString(pod.Name) { return true } } return false } var singleValueOperators = map[selection.Operator]int{ selection.Equals: 1, selection.DoubleEquals: 1, selection.NotEquals: 1, selection.GreaterThan: 1, selection.LessThan: 1, } func selectorsFromFilters(filters Filters) (labels.Selector, fields.Selector, error) { labelSelector := labels.Everything() for _, f := range filters.Labels { if f.Op == selection.In || f.Op == selection.NotIn { return nil, nil, fmt.Errorf("label filters don't support operator: '%s'", f.Op) } var vals []string if _, ok := singleValueOperators[f.Op]; ok { vals = []string{f.Value} } r, err := labels.NewRequirement(f.Key, f.Op, vals) if err != nil { return nil, nil, err } labelSelector = labelSelector.Add(*r) } var selectors []fields.Selector for _, f := range filters.Fields { switch f.Op { case selection.Equals: selectors = append(selectors, fields.OneTermEqualSelector(f.Key, f.Value)) case selection.NotEquals: selectors = append(selectors, fields.OneTermNotEqualSelector(f.Key, f.Value)) case selection.DoesNotExist, selection.DoubleEquals, selection.In, selection.NotIn, selection.Exists, selection.GreaterThan, selection.LessThan: fallthrough default: return nil, nil, fmt.Errorf("field filters don't support operator: '%s'", f.Op) } } if filters.Node != "" { selectors = append(selectors, fields.OneTermEqualSelector(podNodeField, filters.Node)) } return labelSelector, fields.AndSelectors(selectors...), nil } func (c *WatchClient) addOrUpdateNamespace(namespace *api_v1.Namespace) { newNamespace := &Namespace{ Name: namespace.Name, NamespaceUID: string(namespace.UID), StartTime: namespace.GetCreationTimestamp(), } newNamespace.Attributes = c.extractNamespaceAttributes(namespace) c.m.Lock() if namespace.Name != "" { c.Namespaces[namespace.Name] = newNamespace } c.m.Unlock() } func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNamespace { return true } } for _, r := range c.Rules.Annotations { if r.From == MetadataFromNamespace { return true } } return false } func (c *WatchClient) extractNodeLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNode { return true } } for _, r := range c.Rules.Annotations { if r.From == MetadataFromNode { return true } } return false } func (c *WatchClient) extractNodeUID() bool { return c.Rules.NodeUID } func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) { newNode := &Node{ Name: node.Name, NodeUID: string(node.UID), } newNode.Attributes = c.extractNodeAttributes(node) c.m.Lock() if node.Name != "" { c.Nodes[node.Name] = newNode } c.m.Unlock() } func needContainerAttributes(rules ExtractionRules) bool { return rules.ContainerImageName || rules.ContainerName || rules.ContainerImageTag || rules.ContainerImageRepoDigests || rules.ContainerID } func (c *WatchClient) handleReplicaSetAdd(obj any) { c.telemetryBuilder.OtelsvcK8sReplicasetAdded.Add(context.Background(), 1) if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { c.addOrUpdateReplicaSet(replicaset) } else { c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj)) } } func (c *WatchClient) handleReplicaSetUpdate(_, newRS any) { c.telemetryBuilder.OtelsvcK8sReplicasetUpdated.Add(context.Background(), 1) if replicaset, ok := newRS.(*apps_v1.ReplicaSet); ok { c.addOrUpdateReplicaSet(replicaset) } else { c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", newRS)) } } func (c *WatchClient) handleReplicaSetDelete(obj any) { c.telemetryBuilder.OtelsvcK8sReplicasetDeleted.Add(context.Background(), 1) if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*apps_v1.ReplicaSet); ok { c.m.Lock() key := string(replicaset.UID) delete(c.ReplicaSets, key) c.m.Unlock() } else { c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj)) } } func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) { newReplicaSet := &ReplicaSet{ Name: replicaset.Name, Namespace: replicaset.Namespace, UID: string(replicaset.UID), } for _, ownerReference := range replicaset.OwnerReferences { if ownerReference.Kind == "Deployment" && ownerReference.Controller != nil && *ownerReference.Controller { newReplicaSet.Deployment = Deployment{ Name: ownerReference.Name, UID: string(ownerReference.UID), } break } } c.m.Lock() if replicaset.UID != "" { c.ReplicaSets[string(replicaset.UID)] = newReplicaSet } c.m.Unlock() } // This function removes all data from the ReplicaSet except what is required by extraction rules func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.ReplicaSet { transformedReplicaset := apps_v1.ReplicaSet{ ObjectMeta: meta_v1.ObjectMeta{ Name: replicaset.GetName(), Namespace: replicaset.GetNamespace(), UID: replicaset.GetUID(), }, } transformedReplicaset.SetOwnerReferences(replicaset.GetOwnerReferences()) return &transformedReplicaset } func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { c.m.RLock() replicaset, ok := c.ReplicaSets[uid] c.m.RUnlock() if ok { return replicaset, ok } return nil, false } // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer // to be finished to correctly establish the connection to the replicaset/deployment it belongs to. func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, dependencies []cache.InformerSynced) { if len(dependencies) > 0 { timeoutCh := make(chan struct{}) // TODO hard coding the timeout for now, check if we should make this configurable t := time.AfterFunc(5*time.Second, func() { close(timeoutCh) }) defer t.Stop() cache.WaitForCacheSync(timeoutCh, dependencies...) } informer.Run(c.stopCh) } // ignoreDeletedFinalStateUnknown returns the object wrapped in // DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do // not need the additional context. func ignoreDeletedFinalStateUnknown(obj any) any { if obj, ok := obj.(cache.DeletedFinalStateUnknown); ok { return obj.Obj } return obj }