in internal/pkg/composable/providers/kubernetes/pod.go [63:175]
func NewPodEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string,
managed bool) (Eventer, error) {
watcher, err := kubernetes.NewNamedWatcher("agent-pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher
metaConf := cfg.AddResourceMetadata
if metaConf.Node.Enabled() || cfg.Hints.Enabled {
nodeWatcher, err = kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
HonorReSyncs: true,
}, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
}
if metaConf.Namespace.Enabled() || cfg.Hints.Enabled {
namespaceWatcher, err = kubernetes.NewNamedWatcher("agent-namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
metadataClient, err := kubernetes.GetKubernetesMetadataClient(cfg.KubeConfig, cfg.KubeClientOptions)
if err != nil {
logger.Errorf("Error creating metadata client for %T due to error %+v", &kubernetes.Namespace{}, err)
}
// use a custom watcher here, so we can provide a transform function and limit the data we're storing
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil, metadata.RemoveUnnecessaryReplicaSetData)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Namespace: cfg.Namespace,
HonorReSyncs: true,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}
rawConfig, err := c.NewConfigFrom(cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
p := &pod{
logger: logger,
cleanupTimeout: cfg.CleanupTimeout,
comm: comm,
scope: scope,
config: cfg,
metagen: metaGen,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
replicasetWatcher: replicaSetWatcher,
jobWatcher: jobWatcher,
managed: managed,
}
watcher.AddEventHandler(p)
if nodeWatcher != nil && metaConf.Node.Enabled() {
updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), p.nodeWatcher, &p.crossUpdate)
nodeWatcher.AddEventHandler(updater)
}
if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), p.namespaceWatcher, &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}
return p, nil
}