func NewPodEventer()

in internal/pkg/composable/providers/kubernetes/pod.go [63:175]


func NewPodEventer(
	comm composable.DynamicProviderComm,
	cfg *Config,
	logger *logp.Logger,
	client k8s.Interface,
	scope string,
	managed bool) (Eventer, error) {
	watcher, err := kubernetes.NewNamedWatcher("agent-pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
		SyncTimeout:  cfg.SyncPeriod,
		Node:         cfg.Node,
		Namespace:    cfg.Namespace,
		HonorReSyncs: true,
	}, nil)
	if err != nil {
		return nil, errors.New(err, "couldn't create kubernetes watcher")
	}

	var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher

	metaConf := cfg.AddResourceMetadata

	if metaConf.Node.Enabled() || cfg.Hints.Enabled {
		nodeWatcher, err = kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
			SyncTimeout:  cfg.SyncPeriod,
			Node:         cfg.Node,
			HonorReSyncs: true,
		}, nil)
		if err != nil {
			logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
		}
	}

	if metaConf.Namespace.Enabled() || cfg.Hints.Enabled {
		namespaceWatcher, err = kubernetes.NewNamedWatcher("agent-namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
			SyncTimeout:  cfg.SyncPeriod,
			Namespace:    cfg.Namespace,
			HonorReSyncs: true,
		}, nil)
		if err != nil {
			logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
		}
	}

	// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
	// in order to be able to retrieve 2nd layer Owner metadata like in case of:
	// Deployment -> Replicaset -> Pod
	// CronJob -> job -> Pod
	if metaConf.Deployment {
		metadataClient, err := kubernetes.GetKubernetesMetadataClient(cfg.KubeConfig, cfg.KubeClientOptions)
		if err != nil {
			logger.Errorf("Error creating metadata client for %T due to error %+v", &kubernetes.Namespace{}, err)
		}
		// use a custom watcher here, so we can provide a transform function and limit the data we're storing
		replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
			"resource_metadata_enricher_rs",
			client,
			metadataClient,
			schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
			kubernetes.WatchOptions{
				SyncTimeout:  cfg.SyncPeriod,
				Namespace:    cfg.Namespace,
				HonorReSyncs: true,
			}, nil, metadata.RemoveUnnecessaryReplicaSetData)
		if err != nil {
			logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
		}
	}
	if metaConf.CronJob {
		jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
			SyncTimeout:  cfg.SyncPeriod,
			Namespace:    cfg.Namespace,
			HonorReSyncs: true,
		}, nil)
		if err != nil {
			logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
		}
	}

	rawConfig, err := c.NewConfigFrom(cfg)
	if err != nil {
		return nil, errors.New(err, "failed to unpack configuration")
	}
	metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

	p := &pod{
		logger:            logger,
		cleanupTimeout:    cfg.CleanupTimeout,
		comm:              comm,
		scope:             scope,
		config:            cfg,
		metagen:           metaGen,
		watcher:           watcher,
		nodeWatcher:       nodeWatcher,
		namespaceWatcher:  namespaceWatcher,
		replicasetWatcher: replicaSetWatcher,
		jobWatcher:        jobWatcher,
		managed:           managed,
	}

	watcher.AddEventHandler(p)

	if nodeWatcher != nil && metaConf.Node.Enabled() {
		updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), p.nodeWatcher, &p.crossUpdate)
		nodeWatcher.AddEventHandler(updater)
	}

	if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
		updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), p.namespaceWatcher, &p.crossUpdate)
		namespaceWatcher.AddEventHandler(updater)
	}

	return p, nil
}