pkg/exporter/nettop/cache.go (410 lines of code) (raw):

package nettop import ( "context" "encoding/base64" "encoding/json" "fmt" "os" "strings" "sync" "sync/atomic" "time" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" v1 "k8s.io/cri-api/pkg/apis/runtime/v1" log "github.com/sirupsen/logrus" "github.com/patrickmn/go-cache" "github.com/vishvananda/netns" ) var ( cacheUpdateInterval = 10 * time.Second entities = atomic.Pointer[[]*Entity]{} nsCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) pidCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) ipCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) control = make(chan struct{}) lock sync.Mutex defaultEntity = &Entity{} ) func podNameFromServiceAccountToken() (string, error) { token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") if err != nil { return "", fmt.Errorf("failed get pod token, err: %w", err) } arr := strings.Split(string(token), ".") if len(arr) != 3 { return "", fmt.Errorf("invalid serviceaccount token format") } data, err := base64.RawStdEncoding.DecodeString(arr[1]) if err != nil { return "", fmt.Errorf("failed decode serviceaccount token: %w", err) } s := struct { K8s struct { Pod struct { Name string `json:"name"` } `json:"pod"` } `json:"kubernetes.io"` }{} if err := json.Unmarshal(data, &s); err != nil { return "", fmt.Errorf("failed unmarshal serviceaccount token: %w", err) } return s.K8s.Pod.Name, nil } func currentPodInfo() (string, string, error) { var err error namespace := os.Getenv("KUBESKOOP_POD_NAMESPACE") name := os.Getenv("KUBESKOOP_POD_NAME") if namespace == "" { log.Infof("failed get pod namespace for sidecar mode from env KUBESKOOP_POD_NAMESPACE, try from k8s serviceaccount") namespaceBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err != nil { return "", "", fmt.Errorf("failed get namespace in sidecar mode, err: %v", err) } namespace = string(namespaceBytes) } if name == "" { log.Infof("failed get pod namespace for sidecar mode from env KUBESKOOP_POD_NAME, try from k8s serviceaccount") name, err = podNameFromServiceAccountToken() if err != nil { log.Warnf("failed get pod name from /var/run/secrets/kubernetes.io/serviceaccount/token, fallback to hostname") nameBytes, err := os.ReadFile("/etc/hostname") if err != nil { return "", "", fmt.Errorf("failed get namespace in sidecar mode, err: %v", err) } return namespace, string(nameBytes), nil } } return namespace, name, nil } func initDefaultEntity(sidecarMode bool) error { self := os.Getpid() hostNetNSId, err := getNsInumByPid(self) if err != nil { return fmt.Errorf("failed get host nsnum id, err: %w", err) } ipList, err := hostIPList() if err != nil { return err } //add host network defaultEntity = &Entity{ netnsMeta: &netnsMeta{ inum: hostNetNSId, mountPath: fmt.Sprintf("/proc/%d/ns/net", self), isHostNetwork: !sidecarMode, ipList: ipList, }, initPid: self, } if sidecarMode { namespace, name, err := currentPodInfo() if err != nil { return fmt.Errorf("failed get current pod info: %w", err) } defaultEntity.podMeta = podMeta{ namespace: namespace, name: name, } ent := []*Entity{defaultEntity} entities.Store(&ent) } addEntityToCache(defaultEntity, false, true) return nil } func hostIPList() ([]string, error) { links, err := netlink.LinkList() if err != nil { return nil, fmt.Errorf("failed get host link list: %w", err) } var ret []string for _, link := range links { addrs, err := netlink.AddrList(link, unix.AF_INET) if err != nil { log.Errorf("failed get addr from link %s: %v", link.Attrs().Name, err) continue } for _, addr := range addrs { if !addr.IP.IsGlobalUnicast() { continue } ret = append(ret, addr.IP.String()) } } return ret, nil } type netnsMeta struct { inum int mountPath string isHostNetwork bool ipList []string } type podMeta struct { name string namespace string } type Entity struct { *netnsMeta podMeta initPid int pids []int labels map[string]string } func (e *Entity) String() string { return fmt.Sprintf("%s/%s", e.GetPodNamespace(), e.GetPodName()) } func (e *Entity) GetPodName() string { return e.podMeta.name } func (e *Entity) GetPodNamespace() string { return e.podMeta.namespace } func (e *Entity) GetLabels() map[string]string { return e.labels } func (e *Entity) IsHostNetwork() bool { return e.netnsMeta.isHostNetwork } func (e *Entity) GetNetns() int { return e.netnsMeta.inum } func (e *Entity) GetNetnsMountPoint() string { return e.netnsMeta.mountPath } func (e *Entity) OpenNsHandle() (netns.NsHandle, error) { //TODO check whether we should close the opened file return netns.GetFromPath(e.netnsMeta.mountPath) } // GetPid return a random initPid of entify, if no process in netns,return 0 func (e *Entity) GetPid() int { return e.initPid } func (e *Entity) GetPids() []int { return e.pids } func StartCache(ctx context.Context, sidecarMode bool) error { if !sidecarMode { if err := initCriClient(runtimeEndpoints); err != nil { // fallback to node level apiserver watch apiserverClient, err = StartPodCacheWatch(ctx) if err != nil { return err } } else { if err := initCriInfo(); err != nil { return err } } } if err := initDefaultEntity(sidecarMode); err != nil { return err } if sidecarMode { return nil } if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { return fmt.Errorf("failed cache pods, err: %v", err) } go cacheDaemonLoop(ctx, control) return nil } func StopCache() { close(control) } func cacheDaemonLoop(_ context.Context, control chan struct{}) { log.Infof("nettop cache loop start") t := time.NewTicker(cacheUpdateInterval) defer t.Stop() loop: for { select { case <-control: log.Info("cache daemon loop exit of control signal") break loop case <-t.C: if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { log.Errorf("failed cache pods: %v", err) } } } } func cachePodsWithTimeout(timeout time.Duration) error { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() var err error cacheDone := make(chan struct{}) go func(done chan struct{}) { if apiserverClient != nil { err = cacheNetTopologyNoCri(ctx) } else { err = cacheNetTopology(ctx) } close(done) }(cacheDone) select { case <-ctx.Done(): log.Infof("cache process time exceeded, latency: %fs", time.Since(start).Seconds()) return fmt.Errorf("timeout process pods") case <-cacheDone: log.Infof("cache process finished, latency: %fs", time.Since(start).Seconds()) return err } } func addEntityToCache(e *Entity, ignoreHostPod, noExpiration bool) { expirationTime := 3 * cacheUpdateInterval if noExpiration { expirationTime = cache.NoExpiration } if !(ignoreHostPod && e.IsHostNetwork()) { nsCache.Set(fmt.Sprintf("%d", e.inum), e, expirationTime) } for _, ip := range e.ipList { ipCache.Set(ip, e, expirationTime) } for _, pid := range e.pids { pidCache.Set(fmt.Sprintf("%d", pid), e, expirationTime) } } func contextDone(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } } type CRIInfo struct { Version string RuntimeName string RuntimeVersion string } func getSandboxInfoSpec(sandboxStatus *v1.PodSandboxStatusResponse) (*sandboxInfoSpec, error) { if criInfo.RuntimeName == "docker" { return getSandboxInfoSpecForDocker(sandboxStatus.Status.Id) } infoString := sandboxStatus.Info["info"] if infoString == "" { return nil, fmt.Errorf("sandbox status does not contains \"info\" field") } info := &sandboxInfoSpec{} if err := json.Unmarshal([]byte(infoString), info); err != nil { return nil, fmt.Errorf("failed unmarsh info to struct, err: %v", err) } return info, nil } func cacheNetTopologyNoCri(_ context.Context) error { lock.Lock() defer lock.Unlock() pods := apiserverClient.GetAllLocalPods() var newEntities []*Entity newEntities = append(newEntities, defaultEntity) for _, pod := range pods { if pod.Name == "" || pod.Namespace == "" { continue } pids := tasksInsidePodCgroup(pod.CgroupPath, true) if len(pids) == 0 { log.Warnf("sandbox %s/%s: found 0 pids under cgroup %s", pod.Namespace, pod.Name, pod.CgroupPath) } e := &Entity{ netnsMeta: &netnsMeta{ inum: int(pod.NetNSInode), mountPath: pod.NetNSPath, isHostNetwork: pod.IsHostNetwork, ipList: []string{pod.IP}, }, podMeta: podMeta{ name: pod.Name, namespace: pod.Namespace, }, initPid: pod.SandboxPID, labels: pod.Labels, pids: pids, } newEntities = append(newEntities, e) addEntityToCache(e, true, false) } entities.Store(&newEntities) log.Debug("finished cache process") return nil } func cacheNetTopology(ctx context.Context) error { lock.Lock() defer lock.Unlock() var newEntities []*Entity newEntities = append(newEntities, defaultEntity) sandboxList, err := criClient.ListPodSandbox(&v1.PodSandboxFilter{ State: &v1.PodSandboxStateValue{ State: v1.PodSandboxState_SANDBOX_READY, }, }) if err != nil { return fmt.Errorf("failed list pod sandboxes: %w", err) } for _, sandbox := range sandboxList { if contextDone(ctx) { return fmt.Errorf("timeout") } if sandbox.Metadata == nil { log.Errorf("invalid sandbox who has no metadata, id %s", sandbox.Id) } namespace := sandbox.Metadata.Namespace name := sandbox.Metadata.Name labels := sandbox.Labels sandboxStatus, err := criClient.PodSandboxStatus(sandbox.Id, true) if err != nil { log.Errorf("sandbox: %s/%s failed get status err: %v", namespace, name, err) continue } if sandboxStatus.Status == nil { log.Errorf("sandbox %s/%s: invalid sandbox status", sandbox.Metadata.Namespace, sandbox.Metadata.Name) continue } info, err := getSandboxInfoSpec(sandboxStatus) if err != nil { log.Errorf("failed get sandbox info: %v", err) continue } netnsNum, err := getNsInumByPid(info.Pid) if err != nil { log.Errorf("failed get netns for initPid %d, err: %v", info.Pid, err) continue } podCgroupPath := info.Config.Linux.CgroupParent var pids []int if podCgroupPath != "" { pids = tasksInsidePodCgroup(podCgroupPath, false) if len(pids) == 0 { log.Warnf("sandbox %s/%s: found 0 pids under cgroup %s", namespace, name, podCgroupPath) } } var ns *netnsMeta if netnsNum == defaultEntity.inum { ns = defaultEntity.netnsMeta } else { status := sandboxStatus.Status if status.Network == nil || status.Network.Ip == "" { log.Errorf("sanbox %s/%s: invalid sandbox status, no ip", sandbox.Metadata.Namespace, sandbox.Metadata.Name) continue } ns = &netnsMeta{ inum: netnsNum, mountPath: fmt.Sprintf("/proc/%d/ns/net", info.Pid), isHostNetwork: false, ipList: []string{status.Network.Ip}, } } e := &Entity{ netnsMeta: ns, podMeta: podMeta{ name: name, namespace: namespace, }, initPid: info.Pid, labels: labels, pids: pids, } newEntities = append(newEntities, e) addEntityToCache(e, true, false) } entities.Store(&newEntities) log.Debug("finished cache process") return nil }