func()

in libbeat/processors/add_kubernetes_metadata/kubernetes.go [148:329]


func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
	k.initOnce.Do(func() {
		var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher

		// We initialise the use_kubeadm variable based on modules KubeAdm base configuration
		err := config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, config.KubeAdm)
		if err != nil {
			k.log.Errorf("couldn't set kubeadm variable for namespace due to error %+v", err)
		}
		err = config.AddResourceMetadata.Node.SetBool("use_kubeadm", -1, config.KubeAdm)
		if err != nil {
			k.log.Errorf("couldn't set kubeadm variable for node due to error %+v", err)
		}
		client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
		if err != nil {
			if kubernetes.IsInCluster(config.KubeConfig) {
				k.log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err)
			} else if config.KubeConfig == "" {
				k.log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err)
			} else {
				k.log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err)
			}
			return
		}

		if !isKubernetesAvailableWithRetry(client) {
			return
		}

		matchers := NewMatchers(config.Matchers)

		if matchers.Empty() {
			k.log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins")
			return
		}

		k.matchers = matchers
		nd := &kubernetes.DiscoverKubernetesNodeParams{
			ConfigHost:  config.Node,
			Client:      client,
			IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
			HostUtils:   &kubernetes.DefaultDiscoveryUtils{},
		}
		if config.Scope == "node" {
			config.Node, err = kubernetes.DiscoverKubernetesNode(k.log, nd)
			if err != nil {
				k.log.Errorf("Couldn't discover Kubernetes node: %w", err)
				return
			}
			k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Node)
		}

		watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
			SyncTimeout:  config.SyncPeriod,
			Node:         config.Node,
			Namespace:    config.Namespace,
			HonorReSyncs: true,
		}, nil)
		if err != nil {
			k.log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{})
			return
		}

		metaConf := config.AddResourceMetadata

		if metaConf.Node.Enabled() {
			nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
				SyncTimeout:  config.SyncPeriod,
				Node:         config.Node,
				HonorReSyncs: true,
			}, nil)
			if err != nil {
				k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
			}
		}

		if metaConf.Namespace.Enabled() {
			namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
				SyncTimeout:  config.SyncPeriod,
				Namespace:    config.Namespace,
				HonorReSyncs: true,
			}, nil)
			if err != nil {
				k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
			}
		}

		// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
		// in order to be able to retrieve 2nd layer Owner metadata like in case of:
		// Deployment -> Replicaset -> Pod
		// CronJob -> job -> Pod
		if metaConf.Deployment {
			metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
			if err != nil {
				k.log.Errorf("Error creating metadata client due to error %+v", err)
			}
			replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
				"resource_metadata_enricher_rs",
				client,
				metadataClient,
				schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
				kubernetes.WatchOptions{
					SyncTimeout:  config.SyncPeriod,
					Namespace:    config.Namespace,
					HonorReSyncs: true,
				},
				nil,
				metadata.RemoveUnnecessaryReplicaSetData,
			)
			if err != nil {
				k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
			}
			k.rsWatcher = replicaSetWatcher
		}
		if metaConf.CronJob {
			jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
				SyncTimeout:  config.SyncPeriod,
				Namespace:    config.Namespace,
				HonorReSyncs: true,
			}, nil)
			if err != nil {
				k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
			}
			k.jobWatcher = jobWatcher
		}

		// TODO: refactor the above section to a common function to be used by NeWPodEventer too
		metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

		k.indexers = NewIndexers(config.Indexers, metaGen)
		k.watcher = watcher
		k.kubernetesAvailable = true
		k.nodeWatcher = nodeWatcher
		k.nsWatcher = namespaceWatcher

		watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				pod, _ := obj.(*kubernetes.Pod)
				k.addPod(pod)
			},
			UpdateFunc: func(obj interface{}) {
				pod, _ := obj.(*kubernetes.Pod)
				k.updatePod(pod)
			},
			DeleteFunc: func(obj interface{}) {
				pod, _ := obj.(*kubernetes.Pod)
				k.removePod(pod)
			},
		})

		// NOTE: order is important here since pod meta will include node meta and hence node.Store() should
		// be populated before trying to generate metadata for Pods.
		if k.nodeWatcher != nil {
			if err := k.nodeWatcher.Start(); err != nil {
				k.log.Debugf("add_kubernetes_metadata", "Couldn't start node watcher: %v", err)
				return
			}
		}
		if k.nsWatcher != nil {
			if err := k.nsWatcher.Start(); err != nil {
				k.log.Debugf("add_kubernetes_metadata", "Couldn't start namespace watcher: %v", err)
				return
			}
		}
		if k.rsWatcher != nil {
			if err := k.rsWatcher.Start(); err != nil {
				k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
				return
			}
		}
		if k.jobWatcher != nil {
			if err := k.jobWatcher.Start(); err != nil {
				k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
				return
			}
		}
		if err := watcher.Start(); err != nil {
			k.log.Debugf("add_kubernetes_metadata", "Couldn't start pod watcher: %v", err)
			return
		}
	})
}