in libbeat/processors/add_kubernetes_metadata/kubernetes.go [148:329]
func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.initOnce.Do(func() {
var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher
// We initialise the use_kubeadm variable based on modules KubeAdm base configuration
err := config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
k.log.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err)
}
err = config.AddResourceMetadata.Node.SetBool("use_kubeadm", -1, config.KubeAdm)
if err != nil {
k.log.Errorf("couldn't set kubeadm variable for node due to error %+v", err)
}
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
k.log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err)
} else if config.KubeConfig == "" {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err)
} else {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err)
}
return
}
if !isKubernetesAvailableWithRetry(client) {
return
}
matchers := NewMatchers(config.Matchers)
if matchers.Empty() {
k.log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins")
return
}
k.matchers = matchers
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
if config.Scope == "node" {
config.Node, err = kubernetes.DiscoverKubernetesNode(k.log, nd)
if err != nil {
k.log.Errorf("Couldn't discover Kubernetes node: %w", err)
return
}
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Node)
}
watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{})
return
}
metaConf := config.AddResourceMetadata
if metaConf.Node.Enabled() {
nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
}
if metaConf.Namespace.Enabled() {
namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
k.log.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
k.rsWatcher = replicaSetWatcher
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
k.jobWatcher = jobWatcher
}
// TODO: refactor the above section to a common function to be used by NeWPodEventer too
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
k.kubernetesAvailable = true
k.nodeWatcher = nodeWatcher
k.nsWatcher = namespaceWatcher
watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod, _ := obj.(*kubernetes.Pod)
k.removePod(pod)
},
})
// NOTE: order is important here since pod meta will include node meta and hence node.Store() should
// be populated before trying to generate metadata for Pods.
if k.nodeWatcher != nil {
if err := k.nodeWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start node watcher: %v", err)
return
}
}
if k.nsWatcher != nil {
if err := k.nsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start namespace watcher: %v", err)
return
}
}
if k.rsWatcher != nil {
if err := k.rsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
return
}
}
if k.jobWatcher != nil {
if err := k.jobWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
return
}
}
if err := watcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start pod watcher: %v", err)
return
}
})
}