metricbeat/module/kubernetes/util/kubernetes.go (1,050 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package util import ( "errors" "fmt" "maps" "strings" "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" k8sclient "k8s.io/client-go/kubernetes" k8sclientmeta "k8s.io/client-go/metadata" "k8s.io/apimachinery/pkg/api/meta" k8sresource "k8s.io/apimachinery/pkg/api/resource" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes" "github.com/elastic/beats/v7/metricbeat/mb" ) // Resource metadata keys are composed of multiple parts - usually just the namespace and name. This string is the // separator between the parts when treating the key as a single string. const resourceMetadataKeySeparator = "/" type kubernetesConfig struct { KubeConfig string `config:"kube_config"` KubeAdm bool `config:"use_kubeadm"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` Node string `config:"node"` SyncPeriod time.Duration `config:"sync_period"` // AddMetadata enables enriching metricset events with metadata from the API server AddMetadata bool `config:"add_metadata"` AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` Namespace string `config:"namespace"` } // Enricher takes Kubernetes events and enrich them with k8s metadata type Enricher interface { // Start will start the Kubernetes watcher on the first call, does nothing on the rest // errors are logged as warning Start(*Watchers) // Stop will stop the Kubernetes watcher Stop(*Watchers) // Enrich the given list of events Enrich([]mapstr.M) } type enricher struct { sync.RWMutex metadataCache map[string]mapstr.M index func(mapstr.M) string updateFunc func(kubernetes.Resource) map[string]mapstr.M deleteFunc func(kubernetes.Resource) []string metricsetName string resourceName string watcher *metaWatcher isPod bool config *kubernetesConfig log *logp.Logger } type nilEnricher struct{} func (*nilEnricher) Start(*Watchers) {} func (*nilEnricher) Stop(*Watchers) {} func (*nilEnricher) Enrich([]mapstr.M) {} type metaWatcher struct { watcher kubernetes.Watcher // watcher responsible for watching a specific resource started bool // true if watcher has started, false otherwise metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod) enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher metricsRepo *MetricsRepo // used to update container metrics derived from metadata, like resource limits nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod) } type Watchers struct { metaWatchersMap map[string]*metaWatcher lock sync.RWMutex } const selector = "kubernetes" const StateMetricsetPrefix = "state_" const ( PodResource = "pod" ServiceResource = "service" DeploymentResource = "deployment" ReplicaSetResource = "replicaset" StatefulSetResource = "statefulset" DaemonSetResource = "daemonset" JobResource = "job" NodeResource = "node" CronJobResource = "cronjob" PersistentVolumeResource = "persistentvolume" PersistentVolumeClaimResource = "persistentvolumeclaim" StorageClassResource = "storageclass" NamespaceResource = "state_namespace" HorizontalPodAutoscalerResource = "horizontalpodautoscaler" ) func NewWatchers() *Watchers { watchers := &Watchers{ metaWatchersMap: make(map[string]*metaWatcher), } return watchers } func getResource(resourceName string) kubernetes.Resource { switch resourceName { case PodResource: return &kubernetes.Pod{} case ServiceResource: return &kubernetes.Service{} case DeploymentResource: return &kubernetes.Deployment{} case ReplicaSetResource: return &kubernetes.ReplicaSet{} case StatefulSetResource: return &kubernetes.StatefulSet{} case DaemonSetResource: return &kubernetes.DaemonSet{} case JobResource: return &kubernetes.Job{} case CronJobResource: return &kubernetes.CronJob{} case PersistentVolumeResource: return &kubernetes.PersistentVolume{} case PersistentVolumeClaimResource: return &kubernetes.PersistentVolumeClaim{} case StorageClassResource: return &kubernetes.StorageClass{} case NodeResource: return &kubernetes.Node{} case NamespaceResource: return &kubernetes.Namespace{} default: return nil } } // getExtraWatchers returns a list of the extra resources to watch based on some resource. // The full list can be seen in https://github.com/elastic/beats/issues/37243, at Expected Watchers section. func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string { switch resourceName { case PodResource: extra := []string{} if addResourceMetadata.Node.Enabled() { extra = append(extra, NodeResource) } if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } // We need to create watchers for ReplicaSets and Jobs that it might belong to, // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if addResourceMetadata.Deployment { extra = append(extra, ReplicaSetResource) } if addResourceMetadata.CronJob { extra = append(extra, JobResource) } return extra case ServiceResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case DeploymentResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case ReplicaSetResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case StatefulSetResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case DaemonSetResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case JobResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case CronJobResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case PersistentVolumeResource: return []string{} case PersistentVolumeClaimResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra case StorageClassResource: return []string{} case NodeResource: return []string{} case NamespaceResource: return []string{} case HorizontalPodAutoscalerResource: extra := []string{} if addResourceMetadata.Namespace.Enabled() { extra = append(extra, NamespaceResource) } return extra default: return []string{} } } // getResourceName returns the name of the resource for a metricset. // Example: state_pod metricset uses pod resource. // Exception is state_namespace. func getResourceName(metricsetName string) string { resourceName := metricsetName if resourceName != NamespaceResource { resourceName = strings.ReplaceAll(resourceName, StateMetricsetPrefix, "") } return resourceName } // getWatchOptions builds the kubernetes.WatchOptions{} needed for the watcher based on the config and nodeScope. func getWatchOptions(config *kubernetesConfig, nodeScope bool, client k8sclient.Interface, log *logp.Logger) (*kubernetes.WatchOptions, error) { var err error options := kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, } // Watch objects in the node only. if nodeScope { nd := &kubernetes.DiscoverKubernetesNodeParams{ ConfigHost: config.Node, Client: client, IsInCluster: kubernetes.IsInCluster(config.KubeConfig), HostUtils: &kubernetes.DefaultDiscoveryUtils{}, } options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) if err != nil { return nil, fmt.Errorf("couldn't discover kubernetes node: %w", err) } } return &options, err } func isNamespaced(resourceName string) bool { if resourceName == NodeResource || resourceName == PersistentVolumeResource || resourceName == StorageClassResource { return false } return true } // createWatcher creates a watcher for a specific resource if not already created and stores it in the resourceWatchers map. // resourceName is the key in the resourceWatchers map where the created watcher gets stored. // options are the watch options for a specific watcher. // For example a watcher can be configured through options to watch only for resources on a specific node/namespace or in whole cluster. // resourceWatchers is the store for all created watchers. // extraWatcher bool sets apart the watchers that are created as main watcher for a resource and the ones that are created as an extra watcher. func createWatcher( resourceName string, resource kubernetes.Resource, options kubernetes.WatchOptions, client k8sclient.Interface, metadataClient k8sclientmeta.Interface, resourceWatchers *Watchers, metricsRepo *MetricsRepo, namespace string, extraWatcher bool) (bool, error) { // We need to check the node scope to decide on whether a watcher should be updated or not. nodeScope := false if options.Node != "" { nodeScope = true } // The nodescope for extra watchers node, namespace, replicaset and job should be always false. if extraWatcher { nodeScope = false options.Node = "" } resourceWatchers.lock.Lock() defer resourceWatchers.lock.Unlock() // Check if a watcher for the specific resource already exists. resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName] // If the watcher exists, exit if ok { if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope { // It might happen that the watcher already exists, but is only being used to monitor the resources // of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track // the resources of whole cluster(e.g. in case of state_pod metricset). // If it is the case, then we need to update the watcher by changing its watch options (removing options.Node) // A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options. // The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options. if isNamespaced(resourceName) { options.Namespace = namespace } restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) if err != nil { return false, err } // update the handler of the restartWatcher to match the current watcher's handler. restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler()) resourceMetaWatcher.restartWatcher = restartWatcher resourceMetaWatcher.nodeScope = nodeScope } return false, nil } // Watcher doesn't exist, create it // Check if we need to add namespace to the watcher's options. if isNamespaced(resourceName) { options.Namespace = namespace } var ( watcher kubernetes.Watcher err error ) switch resource.(type) { // use a metadata informer for ReplicaSets, as we only need their metadata case *kubernetes.ReplicaSet: watcher, err = kubernetes.NewNamedMetadataWatcher( "resource_metadata_enricher_rs", client, metadataClient, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, options, nil, transformReplicaSetMetadata, ) default: watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) } if err != nil { return false, fmt.Errorf("error creating watcher for %T: %w", resource, err) } resourceMetaWatcher = &metaWatcher{ watcher: watcher, started: false, // not started yet enrichers: make(map[string]*enricher), metricsRepo: metricsRepo, metricsetsUsing: make([]string, 0), restartWatcher: nil, nodeScope: nodeScope, } resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher // Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache. addEventHandlersToWatcher(resourceMetaWatcher, resourceWatchers) return true, nil } // addEventHandlerToWatcher adds an event handlers to the watcher that invalidate the cache of enrichers attached // to the watcher and update container metrics on Pod change events. func addEventHandlersToWatcher( metaWatcher *metaWatcher, resourceWatchers *Watchers, ) { containerMetricsUpdateFunc := func(pod *kubernetes.Pod) { nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(pod.Spec.NodeName) podId := NewPodId(pod.Namespace, pod.Name) podStore, _ := nodeStore.AddPodStore(podId) for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { metrics := NewContainerMetrics() if cpu, ok := container.Resources.Limits["cpu"]; ok { if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil { metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000) } } if memory, ok := container.Resources.Limits["memory"]; ok { if q, err := k8sresource.ParseQuantity(memory.String()); err == nil { metrics.MemoryLimit = NewFloat64Metric(float64(q.Value())) } } containerStore, _ := podStore.AddContainerStore(container.Name) containerStore.SetContainerMetrics(metrics) } } containerMetricsDeleteFunc := func(pod *kubernetes.Pod) { podId := NewPodId(pod.Namespace, pod.Name) nodeStore := metaWatcher.metricsRepo.GetNodeStore(pod.Spec.NodeName) nodeStore.DeletePodStore(podId) } nodeMetricsUpdateFunc := func(node *kubernetes.Node) { nodeName := node.GetObjectMeta().GetName() metrics := NewNodeMetrics() if cpu, ok := node.Status.Capacity["cpu"]; ok { if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil { metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) } } if memory, ok := node.Status.Capacity["memory"]; ok { if q, err := k8sresource.ParseQuantity(memory.String()); err == nil { metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) } } nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(nodeName) nodeStore.SetNodeMetrics(metrics) } clearMetadataCacheFunc := func(obj interface{}) { enrichers := make(map[string]*enricher, len(metaWatcher.enrichers)) resourceWatchers.lock.Lock() maps.Copy(enrichers, metaWatcher.enrichers) resourceWatchers.lock.Unlock() for _, enricher := range enrichers { enricher.Lock() ids := enricher.deleteFunc(obj.(kubernetes.Resource)) // update this watcher events by removing all the metadata[id] for _, id := range ids { delete(enricher.metadataCache, id) } enricher.Unlock() } } metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { switch res := obj.(type) { case *kubernetes.Pod: containerMetricsUpdateFunc(res) case *kubernetes.Node: nodeMetricsUpdateFunc(res) } }, UpdateFunc: func(obj interface{}) { clearMetadataCacheFunc(obj) switch res := obj.(type) { case *kubernetes.Pod: containerMetricsUpdateFunc(res) case *kubernetes.Node: nodeMetricsUpdateFunc(res) } }, DeleteFunc: func(obj interface{}) { clearMetadataCacheFunc(obj) switch res := obj.(type) { case *kubernetes.Pod: containerMetricsDeleteFunc(res) case *kubernetes.Node: nodeName := res.GetObjectMeta().GetName() metaWatcher.metricsRepo.DeleteNodeStore(nodeName) } }, }) } // addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher // identified by resourceName. The caller of this function should not be holding the lock. func addToMetricsetsUsing(resourceName string, metricsetUsing string, resourceWatchers *Watchers) { resourceWatchers.lock.Lock() defer resourceWatchers.lock.Unlock() data, ok := resourceWatchers.metaWatchersMap[resourceName] if ok { contains := false for _, which := range data.metricsetsUsing { if which == metricsetUsing { contains = true break } } // add this resource to the list of resources using it if !contains { data.metricsetsUsing = append(data.metricsetsUsing, metricsetUsing) } } } // removeFromMetricsetsUsing removes the metricset from the list of resources using the shared watcher. // It returns true if element was removed and new size of array. // The cache should be locked when called. func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourceWatchers *Watchers) (bool, int) { data, ok := resourceWatchers.metaWatchersMap[resourceName] removed := false if ok { newIndex := 0 for i, which := range data.metricsetsUsing { if which == notUsingName { removed = true } else { data.metricsetsUsing[newIndex] = data.metricsetsUsing[i] newIndex++ } } data.metricsetsUsing = data.metricsetsUsing[:newIndex] return removed, len(data.metricsetsUsing) } return removed, 0 } // createAllWatchers creates all the watchers required by a metricset func createAllWatchers( client k8sclient.Interface, metadataClient k8sclientmeta.Interface, metricsetName string, resourceName string, nodeScope bool, config *kubernetesConfig, log *logp.Logger, resourceWatchers *Watchers, metricsRepo *MetricsRepo, ) error { res := getResource(resourceName) if res == nil { return fmt.Errorf("resource for name %s does not exist. Watcher cannot be created", resourceName) } options, err := getWatchOptions(config, nodeScope, client, log) if err != nil { return err } // Create the main watcher for the given resource. // For example pod metricset's main watcher will be pod watcher. // If it fails, we return an error, so we can stop the extra watchers from creating. created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, false) if err != nil { return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err) } else if created { log.Debugf("Created watcher %s successfully, created by %s.", resourceName, metricsetName) } // add this metricset to the ones using the watcher addToMetricsetsUsing(resourceName, metricsetName, resourceWatchers) // Create any extra watchers required by this resource // For example pod requires also namespace and node watcher and possibly replicaset and job watcher. extraWatchers := getExtraWatchers(resourceName, config.AddResourceMetadata) for _, extra := range extraWatchers { extraRes := getResource(extra) if extraRes != nil { created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, true) if err != nil { log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err) } else { if created { log.Debugf("Created watcher %s successfully, created by %s.", extra, metricsetName) } // add this metricset to the ones using the extra watchers addToMetricsetsUsing(extra, metricsetName, resourceWatchers) } } else { log.Errorf("Resource for name %s does not exist. Watcher cannot be created.", extra) } } return nil } // createMetadataGen creates and returns the metadata generator for resources other than pod and service // metaGen is a struct of type Resource and implements Generate method for metadata generation for a given resource kind. func createMetadataGen(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig, resourceName string, resourceWatchers *Watchers) (*metadata.Resource, error) { resourceWatchers.lock.RLock() defer resourceWatchers.lock.RUnlock() resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] // This should not be possible since the watchers should have been created before if resourceMetaWatcher == nil { return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName) } var metaGen *metadata.Resource namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource] if namespaceMetaWatcher != nil { n := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace, (*namespaceMetaWatcher).watcher.Store(), client) metaGen = metadata.NewNamespaceAwareResourceMetadataGenerator(commonConfig, client, n) } else { metaGen = metadata.NewResourceMetadataGenerator(commonConfig, client) } return metaGen, nil } // createMetadataGenSpecific creates and returns the metadata generator for a specific resource - pod or service // A metaGen struct implements a MetaGen interface and is designed to utilize the necessary watchers to collect(Generate) metadata for a specific resource. func createMetadataGenSpecific(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig, resourceName string, resourceWatchers *Watchers) (metadata.MetaGen, error) { resourceWatchers.lock.RLock() defer resourceWatchers.lock.RUnlock() // The watcher for the resource needs to exist resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] if resourceMetaWatcher == nil { return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName) } mainWatcher := (*resourceMetaWatcher).watcher if (*resourceMetaWatcher).restartWatcher != nil { mainWatcher = (*resourceMetaWatcher).restartWatcher } var metaGen metadata.MetaGen if resourceName == PodResource { var nodeWatcher kubernetes.Watcher if nodeMetaWatcher := resourceWatchers.metaWatchersMap[NodeResource]; nodeMetaWatcher != nil { nodeWatcher = (*nodeMetaWatcher).watcher } var namespaceWatcher kubernetes.Watcher if namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]; namespaceMetaWatcher != nil { namespaceWatcher = (*namespaceMetaWatcher).watcher } var replicaSetWatcher kubernetes.Watcher if replicasetMetaWatcher := resourceWatchers.metaWatchersMap[ReplicaSetResource]; replicasetMetaWatcher != nil { replicaSetWatcher = (*replicasetMetaWatcher).watcher } var jobWatcher kubernetes.Watcher if jobMetaWatcher := resourceWatchers.metaWatchersMap[JobResource]; jobMetaWatcher != nil { jobWatcher = (*jobMetaWatcher).watcher } // For example for pod named redis in namespace default, the generator uses the pod watcher for pod metadata, // collects all node metadata using the node watcher's store and all namespace metadata using the namespacewatcher's store. metaGen = metadata.GetPodMetaGen(commonConfig, mainWatcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, addResourceMetadata) return metaGen, nil } else if resourceName == ServiceResource { namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource] if namespaceMetaWatcher == nil { return nil, fmt.Errorf("could not create the metadata generator, as the watcher for namespace does not exist") } namespaceMeta := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace, (*namespaceMetaWatcher).watcher.Store(), client) metaGen = metadata.NewServiceMetadataGenerator(commonConfig, (*resourceMetaWatcher).watcher.Store(), namespaceMeta, client) return metaGen, nil } // Should never reach this part, as this function is only for service or pod resources return metaGen, fmt.Errorf("failed to create a metadata generator for resource %s", resourceName) } // NewResourceMetadataEnricher returns a metadata enricher for a given resource // For the metadata enrichment, resource watchers are used which are shared between // the different metricsets. For example for pod metricset, a pod watcher, a namespace and // node watcher are by default needed in addition to job and replicaset watcher according // to configuration. These watchers will be also used by other metricsets that require them // like state_pod, state_container, node etc. func NewResourceMetadataEnricher( base mb.BaseMetricSet, metricsRepo *MetricsRepo, resourceWatchers *Watchers, nodeScope bool) Enricher { log := base.Logger().Named(selector) // metricset configuration config, err := GetValidatedConfig(base) if err != nil { log.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } // This type of config is needed for the metadata generator // and includes detailed settings for metadata enrichment commonMetaConfig := metadata.Config{} if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { log.Errorf("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig) client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) if err != nil { log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } metricsetName := base.Name() resourceName := getResourceName(metricsetName) // Create all watchers needed for this metricset err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers, metricsRepo) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} } var specificMetaGen metadata.MetaGen var generalMetaGen *metadata.Resource // We initialise the use_kubeadm variable based on modules KubeAdm base configuration err = config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm) if err != nil { log.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err) } err = config.AddResourceMetadata.Node.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm) if err != nil { log.Errorf("couldn't set kubeadm variable for node due to error %+v", err) } // Create the metadata generator to be used in the watcher's event handler. // Both specificMetaGen and generalMetaGen implement Generate method for metadata collection. if resourceName == ServiceResource || resourceName == PodResource { specificMetaGen, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers) } else { generalMetaGen, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers) } if err != nil { log.Errorf("Error trying to create the metadata generators: %s", err) return &nilEnricher{} } _, _ = specificMetaGen, generalMetaGen // necessary for earlier versions of golangci-lint // updateFunc to be used as the resource watchers add and update handler. // The handler function is executed when a watcher is triggered(i.e. new/updated resource). // It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method. // It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function. // It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value. updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen) // deleteFunc to be used as the resource watcher's delete handler. // The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted). // It returns the identifier of the resource. deleteFunc := func(r kubernetes.Resource) []string { accessor, _ := meta.Accessor(r) id := accessor.GetName() namespace := accessor.GetNamespace() if namespace != "" { id = join(namespace, id) } return []string{id} } // indexFunc constructs and returns the resource identifier from a given event. // If a resource is namespaced(e.g. pod) the identifier is in the form of namespace-resource_name. // If it is not namespaced(e.g. node) the identifier is the resource's name. indexFunc := func(e mapstr.M) string { name := getString(e, "name") namespace := getString(e, mb.ModuleDataKey+".namespace") var id string if name != "" && namespace != "" { id = join(namespace, name) } else if namespace != "" { id = namespace } else { id = name } return id } // create a metadata enricher for this metricset enricher := buildMetadataEnricher( metricsetName, resourceName, resourceWatchers, config, updateFunc, deleteFunc, indexFunc, log) if resourceName == PodResource { enricher.isPod = true } return enricher } // NewContainerMetadataEnricher returns an Enricher configured for container events func NewContainerMetadataEnricher( base mb.BaseMetricSet, metricsRepo *MetricsRepo, resourceWatchers *Watchers, nodeScope bool) Enricher { log := base.Logger().Named(selector) config, err := GetValidatedConfig(base) if err != nil { log.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } // This type of config is needed for the metadata generator commonMetaConfig := metadata.Config{} if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { log.Errorf("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig) client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) if err != nil { log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } metricsetName := base.Name() err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers, metricsRepo) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} } // We initialise the use_kubeadm variable based on modules KubeAdm base configuration err = config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm) if err != nil { log.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err) } err = config.AddResourceMetadata.Node.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm) if err != nil { log.Errorf("couldn't set kubeadm variable for node due to error %+v", err) } metaGen, err := createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) if err != nil { log.Errorf("Error trying to create the metadata generators: %s", err) return &nilEnricher{} } updateFunc := func(r kubernetes.Resource) map[string]mapstr.M { metadataEvents := make(map[string]mapstr.M) pod, ok := r.(*kubernetes.Pod) if !ok { base.Logger().Debugf("Error while casting event: %s", ok) } pmeta := metaGen.Generate(pod) statuses := make(map[string]*kubernetes.PodContainerStatus) mapStatuses := func(s []kubernetes.PodContainerStatus) { for i := range s { statuses[s[i].Name] = &s[i] } } mapStatuses(pod.Status.ContainerStatuses) mapStatuses(pod.Status.InitContainerStatuses) for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { cmeta := mapstr.M{} if s, ok := statuses[container.Name]; ok { // Extracting id and runtime ECS fields from ContainerID // which is in the form of <container.runtime>://<container.id> split := strings.Index(s.ContainerID, "://") if split != -1 { kubernetes2.ShouldPut(cmeta, "container.id", s.ContainerID[split+3:], base.Logger()) kubernetes2.ShouldPut(cmeta, "container.runtime", s.ContainerID[:split], base.Logger()) } } id := join(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) cmeta.DeepUpdate(pmeta) metadataEvents[id] = cmeta } return metadataEvents } deleteFunc := func(r kubernetes.Resource) []string { ids := make([]string, 0) pod, ok := r.(*kubernetes.Pod) if !ok { base.Logger().Debugf("Error while casting event: %s", ok) } for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) ids = append(ids, id) } return ids } indexFunc := func(e mapstr.M) string { return join(getString(e, mb.ModuleDataKey+".namespace"), getString(e, mb.ModuleDataKey+".pod.name"), getString(e, "name")) } enricher := buildMetadataEnricher( metricsetName, PodResource, resourceWatchers, config, updateFunc, deleteFunc, indexFunc, log, ) return enricher } func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { config, err := GetConfig(base) if err != nil { logp.Err("Error while getting config: %v", err) return nil, err } config, err = validateConfig(config) if err != nil { logp.Err("Error while validating config: %v", err) return nil, err } return config, nil } func validateConfig(config *kubernetesConfig) (*kubernetesConfig, error) { if !config.AddMetadata { return nil, errors.New("metadata enriching is disabled") } return config, nil } func GetConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { config := &kubernetesConfig{ AddMetadata: true, SyncPeriod: time.Minute * 10, AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), } if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.New("error unpacking configs") } return config, nil } func getString(m mapstr.M, key string) string { val, err := m.GetValue(key) if err != nil { return "" } str, _ := val.(string) return str } func join(fields ...string) string { return strings.Join(fields, resourceMetadataKeySeparator) } // buildMetadataEnricher builds and returns a metadata enricher for a given metricset. // It appends the new enricher to the watcher.enrichers map for the given resource watcher. // It also updates the add, update and delete event handlers of the watcher in order to retrieve // the metadata of all enrichers associated to that watcher. func buildMetadataEnricher( metricsetName string, resourceName string, resourceWatchers *Watchers, config *kubernetesConfig, updateFunc func(kubernetes.Resource) map[string]mapstr.M, deleteFunc func(kubernetes.Resource) []string, indexFunc func(e mapstr.M) string, log *logp.Logger) *enricher { enricher := &enricher{ metadataCache: map[string]mapstr.M{}, index: indexFunc, updateFunc: updateFunc, deleteFunc: deleteFunc, resourceName: resourceName, metricsetName: metricsetName, config: config, log: log, } resourceWatchers.lock.Lock() defer resourceWatchers.lock.Unlock() // Check if a watcher for this resource already exists. resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] if resourceMetaWatcher != nil { // Append the new enricher to watcher's enrichers map. resourceMetaWatcher.enrichers[metricsetName] = enricher enricher.watcher = resourceMetaWatcher } return enricher } // Start starts all the watchers associated with a given enricher's resource. func (e *enricher) Start(resourceWatchers *Watchers) { resourceWatchers.lock.Lock() defer resourceWatchers.lock.Unlock() // Each resource may require multiple watchers. Firstly, we start the // extra watchers as they are a dependency for the main resource watcher // For example a pod watcher requires namespace and node watcher to be started // first. extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata) for _, extra := range extras { extraWatcherMeta := resourceWatchers.metaWatchersMap[extra] if extraWatcherMeta != nil && !extraWatcherMeta.started { if err := extraWatcherMeta.watcher.Start(); err != nil { e.log.Warnf("Error starting %s watcher: %s", extra, err) } else { extraWatcherMeta.started = true } } } // Start the main watcher if not already started. // If there is a restartWatcher defined, stop the old watcher if started and start the restartWatcher. // restartWatcher replaces the old watcher and resourceMetaWatcher.restartWatcher is set to nil. resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName] if resourceMetaWatcher != nil { if resourceMetaWatcher.restartWatcher != nil { if resourceMetaWatcher.started { resourceMetaWatcher.watcher.Stop() } if err := resourceMetaWatcher.restartWatcher.Start(); err != nil { e.log.Warnf("Error restarting %s watcher: %s", e.resourceName, err) } else { resourceMetaWatcher.watcher = resourceMetaWatcher.restartWatcher resourceMetaWatcher.restartWatcher = nil resourceMetaWatcher.started = true } } else { if !resourceMetaWatcher.started { if err := resourceMetaWatcher.watcher.Start(); err != nil { e.log.Warnf("Error starting %s watcher: %s", e.resourceName, err) } else { resourceMetaWatcher.started = true } } } } } // Stop removes the enricher's metricset as a user of the associated watchers. // If no metricset is using the watchers anymore, the watcher gets stopped. func (e *enricher) Stop(resourceWatchers *Watchers) { resourceWatchers.lock.Lock() defer resourceWatchers.lock.Unlock() resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName] if resourceMetaWatcher != nil && resourceMetaWatcher.started { _, size := removeFromMetricsetsUsing(e.resourceName, e.metricsetName, resourceWatchers) if size == 0 { resourceMetaWatcher.watcher.Stop() resourceMetaWatcher.started = false } } extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata) for _, extra := range extras { extraMetaWatcher := resourceWatchers.metaWatchersMap[extra] if extraMetaWatcher != nil && extraMetaWatcher.started { _, size := removeFromMetricsetsUsing(extra, e.metricsetName, resourceWatchers) if size == 0 { extraMetaWatcher.watcher.Stop() extraMetaWatcher.started = false } } } } // Enrich enriches events with metadata saved in the enricher.metadata map // This method is executed whenever a new event is created and about to be published. // The enricher's index method is used to retrieve the resource identifier from each event. func (e *enricher) Enrich(events []mapstr.M) { for _, event := range events { if meta := e.getMetadata(event); meta != nil { k8s, err := meta.GetValue("kubernetes") if err != nil { continue } k8sMeta, ok := k8s.(mapstr.M) if !ok { continue } if e.isPod { // apply pod meta at metricset level if podMeta, ok := k8sMeta["pod"].(mapstr.M); ok { event.DeepUpdate(podMeta) } // don't apply pod metadata to module level delete(k8sMeta, "pod") } ecsMeta := meta err = ecsMeta.Delete("kubernetes") if err != nil { logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err) } event.DeepUpdate(mapstr.M{ mb.ModuleDataKey: k8sMeta, "meta": ecsMeta, }) } } } // getMetadata returns metadata for the given event. If the metadata doesn't exist in the cache, we try to get it // from the watcher store. // The returned map is copy to be owned by the caller. func (e *enricher) getMetadata(event mapstr.M) mapstr.M { e.Lock() defer e.Unlock() metaKey := e.index(event) eventMeta := e.metadataCache[metaKey] if eventMeta == nil { e.updateMetadataCacheFromWatcher(metaKey) eventMeta = e.metadataCache[metaKey] } if eventMeta != nil { eventMeta = eventMeta.Clone() } return eventMeta } // updateMetadataCacheFromWatcher updates the metadata cache for the given key with data from the watcher. func (e *enricher) updateMetadataCacheFromWatcher(key string) { storeKey := getWatcherStoreKeyFromMetadataKey(key) if res, exists, _ := e.watcher.watcher.Store().GetByKey(storeKey); exists { eventMetaMap := e.updateFunc(res.(kubernetes.Resource)) for k, v := range eventMetaMap { e.metadataCache[k] = v } } } // getWatcherStoreKeyFromMetadataKey returns a watcher store key for a given metadata cache key. These are identical // for nearly all resources, and have the form `{namespace}/{name}`, with the exception of containers, where it's // `{namespace}/{pod_name}/{container_name}`. In that case, we want the Pod key, so we drop the final part. func getWatcherStoreKeyFromMetadataKey(metaKey string) string { parts := strings.Split(metaKey, resourceMetadataKeySeparator) if len(parts) <= 2 { // normal K8s resource return metaKey } // container, we need to remove the final part to get the Pod key return strings.Join(parts[:2], resourceMetadataKeySeparator) } func CreateEvent(event mapstr.M, namespace string) (mb.Event, error) { var moduleFieldsMapStr mapstr.M moduleFields, ok := event[mb.ModuleDataKey] var err error if ok { moduleFieldsMapStr, ok = moduleFields.(mapstr.M) if !ok { err = fmt.Errorf("error trying to convert '%s' from event to mapstr.M", mb.ModuleDataKey) } } delete(event, mb.ModuleDataKey) e := mb.Event{ MetricSetFields: event, ModuleFields: moduleFieldsMapStr, Namespace: namespace, } // add root-level fields like ECS fields var metaFieldsMapStr mapstr.M metaFields, ok := event["meta"] if ok { metaFieldsMapStr, ok = metaFields.(mapstr.M) if !ok { err = fmt.Errorf("error trying to convert '%s' from event to mapstr.M", "meta") } delete(event, "meta") if len(metaFieldsMapStr) > 0 { e.RootFields = metaFieldsMapStr } } return e, err } func GetClusterECSMeta(cfg *conf.C, client k8sclient.Interface, logger *logp.Logger) (mapstr.M, error) { clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client) if err != nil { return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err) } ecsClusterMeta := mapstr.M{} if clusterInfo.URL != "" { kubernetes2.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.URL, logger) } if clusterInfo.Name != "" { kubernetes2.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger) } return ecsClusterMeta, nil } // AddClusterECSMeta adds ECS orchestrator fields func AddClusterECSMeta(base mb.BaseMetricSet) mapstr.M { config, err := GetValidatedConfig(base) if err != nil { base.Logger().Info("could not retrieve validated config") return mapstr.M{} } client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { base.Logger().Errorf("fail to get kubernetes client: %s", err) return mapstr.M{} } cfg, _ := conf.NewConfigFrom(&config) ecsClusterMeta, err := GetClusterECSMeta(cfg, client, base.Logger()) if err != nil { base.Logger().Infof("could not retrieve cluster metadata: %s", err) return mapstr.M{} } return ecsClusterMeta } // transformReplicaSetMetadata ensures that the PartialObjectMetadata resources we get from a metadata watcher // can be correctly interpreted by the update function returned by getEventMetadataFunc. // This really just involves adding missing type information. func transformReplicaSetMetadata(obj interface{}) (interface{}, error) { old, ok := obj.(*metav1.PartialObjectMetadata) if !ok { return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj) } old.TypeMeta = metav1.TypeMeta{ APIVersion: "apps/v1", Kind: "ReplicaSet", } return old, nil } // getEventMetadataFunc returns a function that takes a kubernetes Resource as an argument and returns metadata // that can directly be used for event enrichment. // This function is intended to be used as the resource watchers add and update handler. func getEventMetadataFunc( logger *logp.Logger, generalMetaGen *metadata.Resource, specificMetaGen metadata.MetaGen, ) func(r kubernetes.Resource) map[string]mapstr.M { return func(r kubernetes.Resource) map[string]mapstr.M { accessor, accErr := meta.Accessor(r) if accErr != nil { logger.Errorf("Error creating accessor: %s", accErr) } id := accessor.GetName() namespace := accessor.GetNamespace() if namespace != "" { id = join(namespace, id) } switch r := r.(type) { case *kubernetes.Pod: return map[string]mapstr.M{id: specificMetaGen.Generate(r)} case *kubernetes.Node: return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)} case *kubernetes.Deployment: return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)} case *kubernetes.Job: return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)} case *kubernetes.CronJob: return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)} case *kubernetes.Service: return map[string]mapstr.M{id: specificMetaGen.Generate(r)} case *kubernetes.StatefulSet: return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)} case *kubernetes.Namespace: return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)} case *kubernetes.ReplicaSet: return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)} case *kubernetes.DaemonSet: return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)} case *kubernetes.PersistentVolume: return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)} case *kubernetes.PersistentVolumeClaim: return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)} case *kubernetes.StorageClass: return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)} default: return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)} } } }