pkg/exporter/nettop/k8s.go (381 lines of code) (raw):

package nettop import ( "context" "fmt" "os" "path/filepath" "strconv" "strings" "sync" "syscall" "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/fsnotify/fsnotify" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/klog/v2/textlogger" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) var ( logger = log.Log.WithName("nettop-k8s") ) type PodCacheInfo struct { UID string IP string Name string Namespace string Labels map[string]string CgroupPath string SandboxPID int NetNSPath string NetNSInode uint64 IsHostNetwork bool } type cgroupCacheKey struct { path string inode uint64 } type PodCache struct { // Pod basic info cache podInfoCache map[string]PodCacheInfo podInfoCacheLock sync.RWMutex // Cgroup path cache cgroupPathCache map[string]string cgroupPathCacheLock sync.RWMutex cgroupCache *lru.Cache[cgroupCacheKey, interface{}] } func NewPodCache() *PodCache { cgroupCache, _ := lru.New[cgroupCacheKey, interface{}](1000) return &PodCache{ podInfoCache: make(map[string]PodCacheInfo), cgroupPathCache: make(map[string]string), cgroupCache: cgroupCache, } } // isCgroupV2 checks if system is using cgroup v2 func isCgroupV2() bool { _, err := os.Stat("/sys/fs/cgroup/cgroup.controllers") return err == nil } // getSandboxPID finds sandbox container PID from cgroup tasks func (pc *PodCache) getSandboxPID(cgroupPath string) (int, error) { var earliestTime time.Time var sandboxPID int err := filepath.Walk(cgroupPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { return nil } tasksFile := filepath.Join(path, "tasks") content, err := os.ReadFile(tasksFile) if err != nil { return nil } pids := strings.Split(strings.TrimSpace(string(content)), "\n") for _, pidStr := range pids { pid, err := strconv.Atoi(pidStr) if err != nil { continue } procPath := filepath.Join("/proc", pidStr) info, err := os.Stat(procPath) if err != nil { continue } cmdlinePath := filepath.Join(procPath, "cmdline") cmdline, err := os.ReadFile(cmdlinePath) if err != nil { continue } if !strings.Contains(string(cmdline), "pause") { continue } if earliestTime.IsZero() || info.ModTime().Before(earliestTime) { earliestTime = info.ModTime() sandboxPID = pid } } return nil }) if err != nil { return 0, err } if sandboxPID == 0 { return 0, fmt.Errorf("sandbox PID not found") } return sandboxPID, nil } // getNetNSInfo gets network namespace path and inode number for a PID func (pc *PodCache) getNetNSInfo(pid int) (string, uint64, error) { nsPath := fmt.Sprintf("/proc/%d/ns/net", pid) fi, err := os.Stat(nsPath) if err != nil { return "", 0, err } stat := fi.Sys().(*syscall.Stat_t) return nsPath, stat.Ino, nil } // GetAllLocalPods returns all pods in the cache func (pc *PodCache) GetAllLocalPods() []PodCacheInfo { pc.podInfoCacheLock.RLock() defer pc.podInfoCacheLock.RUnlock() pods := make([]PodCacheInfo, 0, len(pc.podInfoCache)) for _, pod := range pc.podInfoCache { pods = append(pods, pod) } return pods } // updatePodBasicInfo updates basic pod information in cache func (pc *PodCache) updatePodBasicInfo(pod *v1.Pod) { logger.Info("update pod basic info", "ns", pod.GetNamespace(), "name", pod.Name, "uid", pod.GetUID()) pc.podInfoCacheLock.Lock() defer pc.podInfoCacheLock.Unlock() uid := string(pod.UID) // Get existing pod info if available existingInfo, exists := pc.podInfoCache[uid] // Update basic info podInfo := PodCacheInfo{ UID: uid, IP: pod.Status.PodIP, Name: pod.Name, Namespace: pod.Namespace, Labels: pod.Labels, IsHostNetwork: pod.Spec.HostNetwork, } // Preserve existing cgroup and network namespace info if exists { podInfo.CgroupPath = existingInfo.CgroupPath podInfo.SandboxPID = existingInfo.SandboxPID podInfo.NetNSPath = existingInfo.NetNSPath podInfo.NetNSInode = existingInfo.NetNSInode } pc.podInfoCache[uid] = podInfo } // updatePodCgroupInfo updates pod's cgroup related information func (pc *PodCache) updatePodCgroupInfo(uid, cgroupPath string) { pathInfo, err := os.Stat(cgroupPath) if err != nil { logger.Error(err, "error stating cgroup path", "path", cgroupPath) return } pathInode := pathInfo.Sys().(*syscall.Stat_t).Ino if pathInode == 0 { logger.Error(fmt.Errorf("error stating cgroup path"), "inode is 0") return } _, ok := pc.cgroupCache.Get(cgroupCacheKey{ path: cgroupPath, inode: pathInode, }) if ok { // already updated return } logger.Info("update pod cgroup info", "uid", uid, "cgroupPath", cgroupPath) err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 1*time.Minute, true, func(_ context.Context) (done bool, err error) { task := tasksInsidePodCgroup(cgroupPath, true) if len(task) > 0 { return true, nil } return false, nil }) if err != nil { logger.Error(err, "wait pod cgroup info timeout", "cgroupPath", cgroupPath) } pc.podInfoCacheLock.Lock() defer pc.podInfoCacheLock.Unlock() // Get existing pod info podInfo, exists := pc.podInfoCache[uid] if !exists { podInfo = PodCacheInfo{ UID: uid, } } // Update cgroup path podInfo.CgroupPath = cgroupPath // Try to get sandbox PID and network namespace info if sandboxPID, err := pc.getSandboxPID(cgroupPath); err == nil { podInfo.SandboxPID = sandboxPID if netNSPath, netNSIno, err := pc.getNetNSInfo(sandboxPID); err == nil { podInfo.NetNSPath = netNSPath podInfo.NetNSInode = netNSIno pc.podInfoCache[uid] = podInfo pc.cgroupCache.Add(cgroupCacheKey{ path: cgroupPath, inode: pathInode, }, struct{}{}) } else { logger.Error(err, "error getting netns info", "uid", uid, "cgroupPath", cgroupPath) } } } // deletePodFromCache removes a pod from the cache func (pc *PodCache) deletePodFromCache(uid string) { pc.podInfoCacheLock.Lock() defer pc.podInfoCacheLock.Unlock() logger.Info("delete pod from cache", "uid", uid) delete(pc.podInfoCache, uid) } // initCgroupWatch initializes cgroup watcher func (pc *PodCache) initCgroupWatch() { var basePath string if isCgroupV2() { basePath = "/sys/fs/cgroup/kubepods.slice" } else { basePath = "/sys/fs/cgroup/cpu/kubepods.slice" } go pc.watchCgroupPath(basePath) go pc.watchCgroupPath(filepath.Join(basePath, "kubepods-burstable.slice")) go pc.watchCgroupPath(filepath.Join(basePath, "kubepods-besteffort.slice")) } // watchCgroupPath watches for changes in cgroup directory func (pc *PodCache) watchCgroupPath(basePath string) { watcher, err := fsnotify.NewWatcher() if err != nil { return } defer watcher.Close() pc.updateCgroupPathCache(basePath) err = watcher.Add(basePath) if err != nil { return } ticker := time.NewTicker(2 * time.Minute) for { select { case event := <-watcher.Events: if event.Op&fsnotify.Create == fsnotify.Create { pc.updateCgroupPathCache(basePath) } case <-ticker.C: pc.updateCgroupPathCache(basePath) case <-watcher.Errors: continue } } } // updateCgroupPathCache updates the mapping of pod UID to cgroup path func (pc *PodCache) updateCgroupPathCache(basePath string) { entries, err := os.ReadDir(basePath) if err != nil { return } pc.cgroupPathCacheLock.Lock() defer pc.cgroupPathCacheLock.Unlock() for _, entry := range entries { name := entry.Name() var uid string switch { case strings.HasPrefix(name, "kubepods-burstable-pod"): uid = strings.TrimSuffix(strings.TrimPrefix(name, "kubepods-burstable-pod"), ".slice") case strings.HasPrefix(name, "kubepods-besteffort-pod"): uid = strings.TrimSuffix(strings.TrimPrefix(name, "kubepods-besteffort-pod"), ".slice") case strings.HasPrefix(name, "kubepods-pod"): uid = strings.TrimSuffix(strings.TrimPrefix(name, "kubepods-pod"), ".slice") default: continue } if uid != "" { uid = strings.ReplaceAll(uid, "_", "-") cgroupPath := filepath.Join(basePath, name) pc.cgroupPathCache[uid] = cgroupPath // Update pod info with new cgroup path go pc.updatePodCgroupInfo(uid, cgroupPath) } } } // StartPodCacheWatch starts watching for pod changes and maintains the cache func StartPodCacheWatch(ctx context.Context) (*PodCache, error) { nodeName := os.Getenv("INSPECTOR_NODENAME") if nodeName == "" { return nil, fmt.Errorf("INSPECTOR_NODENAME environment variable not set") } log.SetLogger(textlogger.NewLogger(textlogger.NewConfig())) podCache := NewPodCache() // Initialize cgroup watcher podCache.initCgroupWatch() scheme := runtime.NewScheme() utilruntime.Must(v1.AddToScheme(scheme)) // Create manager mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ MetricsBindAddress: "0", Cache: cache.Options{ Scheme: scheme, ByObject: map[client.Object]cache.ByObject{ &v1.Pod{}: { Field: client.MatchingFieldsSelector{ Selector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}), }, Transform: func(i interface{}) (interface{}, error) { if pod, ok := i.(*v1.Pod); ok { pod.Spec.Volumes = nil pod.Spec.EphemeralContainers = nil pod.Spec.SecurityContext = nil pod.Spec.ImagePullSecrets = nil pod.Spec.Tolerations = nil pod.Spec.ReadinessGates = nil pod.Spec.PreemptionPolicy = nil pod.Status.InitContainerStatuses = nil pod.Status.ContainerStatuses = nil pod.Status.EphemeralContainerStatuses = nil return pod, nil } return nil, fmt.Errorf("unexpected type %T", i) }, }, }, }, }) if err != nil { return nil, fmt.Errorf("unable to create manager: %v", err) } controller := &PodReconciler{PodCache: podCache, Client: mgr.GetClient()} // Create pod controller err = ctrl.NewControllerManagedBy(mgr). For(&v1.Pod{}). WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool { pod := object.(*v1.Pod) return pod.Spec.NodeName == nodeName })). Complete(controller) if err != nil { return nil, fmt.Errorf("unable to create controller: %v", err) } // Start manager go func() { if err := mgr.Start(ctx); err != nil { klog.Errorf("Failed to start manager: %v", err) } }() go controller.GC(ctx) return podCache, nil } // fixme: add period check // PodReconciler reconciles Pod objects type PodReconciler struct { client.Client PodCache *PodCache } func (r *PodReconciler) GC(ctx context.Context) { ticker := time.NewTicker(1 * time.Minute) for { select { case <-ctx.Done(): return case <-ticker.C: podList := &v1.PodList{} err := r.List(ctx, podList) if err != nil { logger.Error(err, "unable to list pods") continue } podMap := make(map[string]interface{}) for _, pod := range podList.Items { podMap[string(pod.UID)] = struct{}{} } for _, pod := range r.PodCache.GetAllLocalPods() { if _, ok := podMap[pod.UID]; !ok { logger.Info("[GC]removing pod from cache", "pod", pod.UID) r.PodCache.deletePodFromCache(pod.UID) } } } } } // Reconcile handles Pod events func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { pod := &v1.Pod{} err := r.Get(ctx, req.NamespacedName, pod) if err != nil { if errors.IsNotFound(err) { r.PodCache.deletePodFromCache(req.Name) return ctrl.Result{}, nil } return ctrl.Result{}, err } r.PodCache.updatePodBasicInfo(pod) return ctrl.Result{}, nil }