in pkg/exporter/nettop/k8s.go [347:418]
func StartPodCacheWatch(ctx context.Context) (*PodCache, error) {
nodeName := os.Getenv("INSPECTOR_NODENAME")
if nodeName == "" {
return nil, fmt.Errorf("INSPECTOR_NODENAME environment variable not set")
}
log.SetLogger(textlogger.NewLogger(textlogger.NewConfig()))
podCache := NewPodCache()
// Initialize cgroup watcher
podCache.initCgroupWatch()
scheme := runtime.NewScheme()
utilruntime.Must(v1.AddToScheme(scheme))
// Create manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MetricsBindAddress: "0",
Cache: cache.Options{
Scheme: scheme,
ByObject: map[client.Object]cache.ByObject{
&v1.Pod{}: {
Field: client.MatchingFieldsSelector{
Selector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
},
Transform: func(i interface{}) (interface{}, error) {
if pod, ok := i.(*v1.Pod); ok {
pod.Spec.Volumes = nil
pod.Spec.EphemeralContainers = nil
pod.Spec.SecurityContext = nil
pod.Spec.ImagePullSecrets = nil
pod.Spec.Tolerations = nil
pod.Spec.ReadinessGates = nil
pod.Spec.PreemptionPolicy = nil
pod.Status.InitContainerStatuses = nil
pod.Status.ContainerStatuses = nil
pod.Status.EphemeralContainerStatuses = nil
return pod, nil
}
return nil, fmt.Errorf("unexpected type %T", i)
},
},
},
},
})
if err != nil {
return nil, fmt.Errorf("unable to create manager: %v", err)
}
controller := &PodReconciler{PodCache: podCache, Client: mgr.GetClient()}
// Create pod controller
err = ctrl.NewControllerManagedBy(mgr).
For(&v1.Pod{}).
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
pod := object.(*v1.Pod)
return pod.Spec.NodeName == nodeName
})).
Complete(controller)
if err != nil {
return nil, fmt.Errorf("unable to create controller: %v", err)
}
// Start manager
go func() {
if err := mgr.Start(ctx); err != nil {
klog.Errorf("Failed to start manager: %v", err)
}
}()
go controller.GC(ctx)
return podCache, nil
}