libbeat/processors/add_kubernetes_metadata/kubernetes.go (339 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux || darwin || windows
package add_kubernetes_metadata
import (
"fmt"
"os"
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sclient "k8s.io/client-go/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
)
const (
timeout = time.Second * 5
selector = "kubernetes"
checkNodeReadyAttempts = 10
)
type kubernetesAnnotator struct {
log *logp.Logger
watcher kubernetes.Watcher
nsWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
rsWatcher kubernetes.Watcher
jobWatcher kubernetes.Watcher
indexers *Indexers
matchers *Matchers
cache *cache
kubernetesAvailable bool
initOnce sync.Once
}
func init() {
processors.RegisterPlugin("add_kubernetes_metadata", New)
// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Indexing.AddIndexer(PodUIDIndexerName, NewPodUIDIndexer)
Indexing.AddIndexer(ContainerIndexerName, NewContainerIndexer)
Indexing.AddIndexer(IPPortIndexerName, NewIPPortIndexer)
Indexing.AddMatcher(FieldMatcherName, NewFieldMatcher)
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}
func isKubernetesAvailable(client k8sclient.Interface) (bool, error) {
server, err := client.Discovery().ServerVersion()
if err != nil {
return false, err
}
logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server)
return true, nil
}
func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {
connectionAttempts := 1
for {
kubernetesAvailable, err := isKubernetesAvailable(client)
if kubernetesAvailable {
return true
}
if connectionAttempts > checkNodeReadyAttempts {
logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err)
return false
}
time.Sleep(3 * time.Second)
connectionAttempts += 1
}
}
// kubernetesMetadataExist checks whether an event is already enriched with kubernetes metadata
func kubernetesMetadataExist(event *beat.Event) bool {
if _, err := event.GetValue("kubernetes"); err != nil {
return false
}
return true
}
// New constructs a new add_kubernetes_metadata processor.
func New(cfg *config.C) (beat.Processor, error) {
config, err := newProcessorConfig(cfg, Indexing)
if err != nil {
return nil, err
}
log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata")
processor := &kubernetesAnnotator{
log: log,
cache: newCache(config.CleanupTimeout),
kubernetesAvailable: false,
}
// complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case
// the k8s node is not yet ready.
go processor.init(config, cfg)
return processor, nil
}
func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig, error) {
var config kubeAnnotatorConfig
err := cfg.Unpack(&config)
if err != nil {
return config, fmt.Errorf("fail to unpack the kubernetes configuration: %w", err)
}
// Load and append default indexer configs
if config.DefaultIndexers.Enabled {
config.Indexers = append(config.Indexers, register.GetDefaultIndexerConfigs()...)
}
// Load and append default matcher configs
if config.DefaultMatchers.Enabled {
config.Matchers = append(config.Matchers, register.GetDefaultMatcherConfigs()...)
}
return config, nil
}
func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.initOnce.Do(func() {
var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher
// We initialise the use_kubeadm variable based on modules KubeAdm base configuration
err := config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
k.log.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err)
}
err = config.AddResourceMetadata.Node.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
k.log.Errorf("couldn't set kubeadm variable for node due to error %+v", err)
}
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
k.log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err)
} else if config.KubeConfig == "" {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err)
} else {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err)
}
return
}
if !isKubernetesAvailableWithRetry(client) {
return
}
matchers := NewMatchers(config.Matchers)
if matchers.Empty() {
k.log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins")
return
}
k.matchers = matchers
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
if config.Scope == "node" {
config.Node, err = kubernetes.DiscoverKubernetesNode(k.log, nd)
if err != nil {
k.log.Errorf("Couldn't discover Kubernetes node: %w", err)
return
}
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Node)
}
watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{})
return
}
metaConf := config.AddResourceMetadata
if metaConf.Node.Enabled() {
nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
}
if metaConf.Namespace.Enabled() {
namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
k.log.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
k.rsWatcher = replicaSetWatcher
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
k.jobWatcher = jobWatcher
}
// TODO: refactor the above section to a common function to be used by NeWPodEventer too
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
k.kubernetesAvailable = true
k.nodeWatcher = nodeWatcher
k.nsWatcher = namespaceWatcher
watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.removePod(pod)
},
})
// NOTE: order is important here since pod meta will include node meta and hence node.Store() should
// be populated before trying to generate metadata for Pods.
if k.nodeWatcher != nil {
if err := k.nodeWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start node watcher: %v", err)
return
}
}
if k.nsWatcher != nil {
if err := k.nsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start namespace watcher: %v", err)
return
}
}
if k.rsWatcher != nil {
if err := k.rsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
return
}
}
if k.jobWatcher != nil {
if err := k.jobWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
return
}
}
if err := watcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start pod watcher: %v", err)
return
}
})
}
// Run runs the processor that adds a field `kubernetes` to the event fields that
// contains a map with various Kubernetes metadata.
// This processor does not access or modify the `Meta` of the event.
func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
return event, nil
}
if kubernetesMetadataExist(event) {
return event, nil
}
index := k.matchers.MetadataIndex(event.Fields)
if index == "" {
k.log.Debug("No container match string, not adding kubernetes data")
return event, nil
}
metadata := k.cache.get(index)
if metadata == nil {
return event, nil
}
metaClone := metadata.Clone()
_ = metaClone.Delete("kubernetes.container.name")
containerImage, err := metadata.GetValue("kubernetes.container.image")
if err == nil {
_ = metaClone.Delete("kubernetes.container.image")
_, _ = metaClone.Put("kubernetes.container.image.name", containerImage)
}
cmeta, err := metaClone.Clone().GetValue("kubernetes.container")
if err == nil {
event.Fields.DeepUpdate(mapstr.M{
"container": cmeta,
})
}
kubeMeta := metadata.Clone()
// remove container meta from kubernetes.container.*
_ = kubeMeta.Delete("kubernetes.container.id")
_ = kubeMeta.Delete("kubernetes.container.runtime")
_ = kubeMeta.Delete("kubernetes.container.image")
event.Fields.DeepUpdate(kubeMeta)
return event, nil
}
func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
}
if k.nodeWatcher != nil {
k.nodeWatcher.Stop()
}
if k.nsWatcher != nil {
k.nsWatcher.Stop()
}
if k.rsWatcher != nil {
k.rsWatcher.Stop()
}
if k.jobWatcher != nil {
k.jobWatcher.Stop()
}
if k.cache != nil {
k.cache.stop()
}
return nil
}
func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
metadata := k.indexers.GetMetadata(pod)
for _, m := range metadata {
k.cache.set(m.Index, m.Data)
}
}
func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) {
k.removePod(pod)
// Add it again only if it is not being deleted
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
return
}
k.addPod(pod)
}
func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) {
indexes := k.indexers.GetIndexes(pod)
for _, idx := range indexes {
k.cache.delete(idx)
}
}
func (*kubernetesAnnotator) String() string {
return "add_kubernetes_metadata"
}