receiver/k8sclusterreceiver/watcher.go (335 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver" import ( "context" "fmt" "reflect" "sync/atomic" "time" quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/daemonset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/namespace" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) type sharedInformer interface { Start(<-chan struct{}) WaitForCacheSync(<-chan struct{}) map[reflect.Type]bool } type resourceWatcher struct { client kubernetes.Interface osQuotaClient quotaclientset.Interface informerFactories []sharedInformer metadataStore *metadata.Store logger *zap.Logger metadataConsumers []metadataConsumer initialTimeout time.Duration initialSyncDone *atomic.Bool initialSyncTimedOut *atomic.Bool config *Config entityLogConsumer consumer.Logs // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) makeOpenShiftQuotaClient func(apiConf k8sconfig.APIConfig) (quotaclientset.Interface, error) } type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. func newResourceWatcher(set receiver.Settings, cfg *Config, metadataStore *metadata.Store) *resourceWatcher { return &resourceWatcher{ logger: set.Logger, metadataStore: metadataStore, initialSyncDone: &atomic.Bool{}, initialSyncTimedOut: &atomic.Bool{}, initialTimeout: defaultInitialSyncTimeout, config: cfg, makeClient: k8sconfig.MakeClient, makeOpenShiftQuotaClient: k8sconfig.MakeOpenShiftQuotaClient, } } func (rw *resourceWatcher) initialize() error { client, err := rw.makeClient(rw.config.APIConfig) if err != nil { return fmt.Errorf("Failed to create Kubernetes client: %w", err) } rw.client = client if rw.config.Distribution == distributionOpenShift && rw.config.Namespace == "" { rw.osQuotaClient, err = rw.makeOpenShiftQuotaClient(rw.config.APIConfig) if err != nil { return fmt.Errorf("Failed to create OpenShift quota API client: %w", err) } } err = rw.prepareSharedInformerFactory() if err != nil { return err } return nil } func (rw *resourceWatcher) prepareSharedInformerFactory() error { factory := rw.getInformerFactory() // Map of supported group version kinds by name of a kind. // If none of the group versions are supported by k8s server for a specific kind, // informer for that kind won't be set and a warning message is thrown. // This map should be kept in sync with what can be provided by the supported k8s server versions. supportedKinds := map[string][]schema.GroupVersionKind{ "Pod": {gvk.Pod}, "Node": {gvk.Node}, "Namespace": {gvk.Namespace}, "ReplicationController": {gvk.ReplicationController}, "ResourceQuota": {gvk.ResourceQuota}, "Service": {gvk.Service}, "DaemonSet": {gvk.DaemonSet}, "Deployment": {gvk.Deployment}, "ReplicaSet": {gvk.ReplicaSet}, "StatefulSet": {gvk.StatefulSet}, "Job": {gvk.Job}, "CronJob": {gvk.CronJob}, "HorizontalPodAutoscaler": {gvk.HorizontalPodAutoscaler}, } for kind, gvks := range supportedKinds { anySupported := false for _, gvk := range gvks { supported, err := rw.isKindSupported(gvk) if err != nil { return err } if supported { anySupported = true rw.setupInformerForKind(gvk, factory) } } if !anySupported { rw.logger.Warn("Server doesn't support any of the group versions defined for the kind", zap.String("kind", kind)) } } if rw.osQuotaClient != nil { quotaFactory := quotainformersv1.NewSharedInformerFactory(rw.osQuotaClient, 0) rw.setupInformer(gvk.ClusterResourceQuota, quotaFactory.Quota().V1().ClusterResourceQuotas().Informer()) rw.informerFactories = append(rw.informerFactories, quotaFactory) } rw.informerFactories = append(rw.informerFactories, factory) return nil } func (rw *resourceWatcher) getInformerFactory() informers.SharedInformerFactory { var factory informers.SharedInformerFactory if rw.config.Namespace != "" { rw.logger.Info("Namespace filter has been enabled. Nodes and namespaces will not be observed.", zap.String("namespace", rw.config.Namespace)) factory = informers.NewSharedInformerFactoryWithOptions( rw.client, rw.config.MetadataCollectionInterval, informers.WithNamespace(rw.config.Namespace), ) } else { factory = informers.NewSharedInformerFactoryWithOptions( rw.client, rw.config.MetadataCollectionInterval, ) } return factory } func (rw *resourceWatcher) isKindSupported(gvk schema.GroupVersionKind) (bool, error) { resources, err := rw.client.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String()) if err != nil { if apierrors.IsNotFound(err) { // if the discovery endpoint isn't present, assume group version is not supported rw.logger.Debug("Group version is not supported", zap.String("group", gvk.GroupVersion().String())) return false, nil } return false, fmt.Errorf("failed to fetch group version details: %w", err) } for _, r := range resources.APIResources { if r.Kind == gvk.Kind { return true, nil } } return false, nil } func (rw *resourceWatcher) setupInformerForKind(kind schema.GroupVersionKind, factory informers.SharedInformerFactory) { switch kind { case gvk.Pod: rw.setupInformer(kind, factory.Core().V1().Pods().Informer()) case gvk.Node: if rw.config.Namespace == "" { rw.setupInformer(kind, factory.Core().V1().Nodes().Informer()) } case gvk.Namespace: if rw.config.Namespace == "" { rw.setupInformer(kind, factory.Core().V1().Namespaces().Informer()) } case gvk.ReplicationController: rw.setupInformer(kind, factory.Core().V1().ReplicationControllers().Informer()) case gvk.ResourceQuota: rw.setupInformer(kind, factory.Core().V1().ResourceQuotas().Informer()) case gvk.Service: rw.setupInformer(kind, factory.Core().V1().Services().Informer()) case gvk.DaemonSet: rw.setupInformer(kind, factory.Apps().V1().DaemonSets().Informer()) case gvk.Deployment: rw.setupInformer(kind, factory.Apps().V1().Deployments().Informer()) case gvk.ReplicaSet: rw.setupInformer(kind, factory.Apps().V1().ReplicaSets().Informer()) case gvk.StatefulSet: rw.setupInformer(kind, factory.Apps().V1().StatefulSets().Informer()) case gvk.Job: rw.setupInformer(kind, factory.Batch().V1().Jobs().Informer()) case gvk.CronJob: rw.setupInformer(kind, factory.Batch().V1().CronJobs().Informer()) case gvk.HorizontalPodAutoscaler: rw.setupInformer(kind, factory.Autoscaling().V2().HorizontalPodAutoscalers().Informer()) default: rw.logger.Error("Could not setup an informer for provided group version kind", zap.String("group version kind", kind.String())) } } // startWatchingResources starts up all informers. func (rw *resourceWatcher) startWatchingResources(ctx context.Context, inf sharedInformer) context.Context { var cancel context.CancelFunc timedContextForInitialSync, cancel := context.WithTimeout(ctx, rw.initialTimeout) // Start off individual informers in the factory. inf.Start(ctx.Done()) // Ensure cache is synced with initial state, once informers are started up. // Note that the event handler can start receiving events as soon as the informers // are started. So it's required to ensure that the receiver does not start // collecting data before the cache sync since all data may not be available. // This method will block either till the timeout set on the context, until // the initial sync is complete or the parent context is cancelled. inf.WaitForCacheSync(timedContextForInitialSync.Done()) defer cancel() return timedContextForInitialSync } // setupInformer adds event handlers to informers and setups a metadataStore. func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) { err := informer.SetTransform(transformObject) if err != nil { rw.logger.Error("error setting informer transform function", zap.Error(err)) } _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rw.onAdd, UpdateFunc: rw.onUpdate, }) if err != nil { rw.logger.Error("error adding event handler to informer", zap.Error(err)) } rw.metadataStore.Setup(gvk, informer.GetStore()) } func (rw *resourceWatcher) onAdd(obj any) { rw.waitForInitialInformerSync() // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, rw.objMetadata(obj)) } func (rw *resourceWatcher) hasDestination() bool { return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil } func (rw *resourceWatcher) onUpdate(oldObj, newObj any) { rw.waitForInitialInformerSync() // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } rw.syncMetadataUpdate(rw.objMetadata(oldObj), rw.objMetadata(newObj)) } // objMetadata returns the metadata for the given object. func (rw *resourceWatcher) objMetadata(obj any) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { switch o := obj.(type) { case *corev1.Pod: return pod.GetMetadata(o, rw.metadataStore, rw.logger) case *corev1.Node: return node.GetMetadata(o) case *corev1.ReplicationController: return replicationcontroller.GetMetadata(o) case *appsv1.Deployment: return deployment.GetMetadata(o) case *appsv1.ReplicaSet: return replicaset.GetMetadata(o) case *appsv1.DaemonSet: return daemonset.GetMetadata(o) case *appsv1.StatefulSet: return statefulset.GetMetadata(o) case *batchv1.Job: return jobs.GetMetadata(o) case *batchv1.CronJob: return cronjob.GetMetadata(o) case *autoscalingv2.HorizontalPodAutoscaler: return hpa.GetMetadata(o) case *corev1.Namespace: return namespace.GetMetadata(o) } return nil } func (rw *resourceWatcher) waitForInitialInformerSync() { if rw.initialSyncDone.Load() || rw.initialSyncTimedOut.Load() { return } // Wait till initial sync is complete or timeout. for !rw.initialSyncDone.Load() { if rw.initialSyncTimedOut.Load() { return } time.Sleep(100 * time.Millisecond) } } func (rw *resourceWatcher) setupMetadataExporters( exporters map[component.ID]component.Component, metadataExportersFromConfig []string, ) error { var out []metadataConsumer metadataExportersSet := utils.StringSliceToMap(metadataExportersFromConfig) if err := validateMetadataExporters(metadataExportersSet, exporters); err != nil { return fmt.Errorf("failed to configure metadata_exporters: %w", err) } for cfg, exp := range exporters { if !metadataExportersSet[cfg.String()] { continue } kme, ok := exp.(experimentalmetricmetadata.MetadataExporter) if !ok { return fmt.Errorf("%s exporter does not implement MetadataExporter", cfg.Name()) } out = append(out, kme.ConsumeMetadata) rw.logger.Info("Configured Kubernetes MetadataExporter", zap.String("exporter_name", cfg.String()), ) } rw.metadataConsumers = out return nil } func validateMetadataExporters(metadataExporters map[string]bool, exporters map[component.ID]component.Component) error { configuredExporters := map[string]bool{} for cfg := range exporters { configuredExporters[cfg.String()] = true } for e := range metadataExporters { if !configuredExporters[e] { return fmt.Errorf("%s exporter is not in collector config", e) } } return nil } func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata) { timestamp := pcommon.NewTimestampFromTime(time.Now()) metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) if len(metadataUpdate) != 0 { for _, consume := range rw.metadataConsumers { _ = consume(metadataUpdate) } } if rw.entityLogConsumer != nil { // Represent metadata update as entity events. entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp, rw.config.MetadataCollectionInterval) // Convert entity events to log representation. logs := entityEvents.ConvertAndMoveToLogs() if logs.LogRecordCount() != 0 { err := rw.entityLogConsumer.ConsumeLogs(context.Background(), logs) if err != nil { rw.logger.Error("Error sending entity events to the consumer", zap.Error(err)) // Note: receiver contract says that we need to retry sending if the // returned error is not Permanent. However, we are not doing it here. // Instead, we rely on the fact the metadata is collected periodically // and the entity events will be delivered on the next cycle. This is // fine because we deliver cumulative entity state. // This allows us to avoid stressing the Collector or its destination // unnecessarily (typically non-Permanent errors happen in stressed conditions). // The periodic collection will be implemented later, see // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24413 } } } }