pkg/skoop/collector/podcollector/collector.go (611 lines of code) (raw):

package podcollector import ( "context" "encoding/json" "fmt" "net" "os" "os/exec" "reflect" "runtime" "strconv" "strings" "syscall" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/samber/lo" "golang.org/x/sys/unix" "google.golang.org/grpc/credentials/insecure" "github.com/alibaba/kubeskoop/pkg/skoop/collector" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/model" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" "github.com/bastjan/netstat" "github.com/containerd/containerd/pkg/cri/server" "github.com/docker/docker/client" "github.com/moby/ipvs" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "golang.org/x/exp/slices" "google.golang.org/grpc" pb "k8s.io/cri-api/pkg/apis/runtime/v1" pbv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) type podCollector struct { runtimeEndpoint string podNamespace string podName string dockerCli client.APIClient runtimeCli pb.RuntimeServiceClient runtimeCliV1Alpha2 pbv1alpha2.RuntimeServiceClient } func (a *podCollector) DumpNodeInfos() (*k8s.NodeNetworkStackDump, error) { dump := &k8s.NodeNetworkStackDump{} var err error dump.Pods, err = a.PodList() if err != nil { return nil, fmt.Errorf("error get pod list, %v", err) } collectHost := true if a.podNamespace == "" && a.podName == "host" { // only collect for host namespace dump.Pods = nil } if a.podNamespace != "" && a.podName != "" { dump.Pods = lo.Filter(dump.Pods, func(item k8s.PodNetInfo, _ int) bool { return item.PodNamespace == a.podNamespace && item.PodName == a.podName }) collectHost = false } dump.Netns, err = a.SandboxInfos(dump.Pods, collectHost) if err != nil { return nil, fmt.Errorf("error get sandboxs info, %v", err) } return dump, nil } func NewCollector(podNamespace, podName, runtimeEndpoint string) (collector.Collector, error) { pc := &podCollector{podNamespace: podNamespace, podName: podName, runtimeEndpoint: runtimeEndpoint} socket := os.Getenv("RUNTIME_SOCK") if runtimeEndpoint != "" { socket = runtimeEndpoint } if socket == "" { socket = "unix:///var/run/dockershim.sock" _, err := os.Stat("/var/run/dockershim.sock") if err != nil { if os.IsNotExist(err) { containerdSockets := []string{ "unix:///run/containerd/containerd.sock", "unix:///run/k3s/containerd/containerd.sock", } for _, containerdAddr := range containerdSockets { if _, err = os.Stat(strings.TrimPrefix(containerdAddr, "unix://")); err == nil { socket = containerdAddr break } } if socket == "" { return nil, fmt.Errorf("cannot found comportable endpoint address for cri-api, please specify cri address by --collector-cri-address") } } else { return nil, err } } else { pc.dockerCli, err = client.NewClientWithOpts(client.WithVersion("1.25")) if err != nil { return nil, err } } } conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { return nil, err } pc.runtimeCli = pb.NewRuntimeServiceClient(conn) // negotiate cri api version _, err = pc.runtimeCli.Version(context.TODO(), &pb.VersionRequest{}) if status.Code(err) == codes.Unimplemented { pc.runtimeCli = nil pc.runtimeCliV1Alpha2 = pbv1alpha2.NewRuntimeServiceClient(conn) } else if err != nil { return nil, err } return pc, nil } func (a *podCollector) PodInfo(sandbox *pb.PodSandbox) (k8s.PodNetInfo, error) { p := k8s.PodNetInfo{} var ( sandboxStatus *pb.PodSandboxStatusResponse err error ) if a.runtimeCli != nil { sandboxStatus, err = a.runtimeCli.PodSandboxStatus(context.TODO(), &pb.PodSandboxStatusRequest{ PodSandboxId: sandbox.Id, Verbose: a.dockerCli == nil, }) if err != nil { return p, err } } else { statusAlpha, err := a.runtimeCliV1Alpha2.PodSandboxStatus(context.TODO(), &pbv1alpha2.PodSandboxStatusRequest{ PodSandboxId: sandbox.Id, Verbose: a.dockerCli == nil, }) if err != nil { return p, err } sandboxStatus = &pb.PodSandboxStatusResponse{} if err = alphaRespTov1Resp(statusAlpha, sandboxStatus); err != nil { return p, fmt.Errorf("error convert alpha resp %v", err) } } p.PodName = sandboxStatus.Status.GetMetadata().GetName() p.PodNamespace = sandboxStatus.Status.GetMetadata().GetNamespace() p.ContainerID = sandboxStatus.Status.GetId() p.PodUID = sandboxStatus.Status.GetMetadata().Uid if a.dockerCli != nil { sandboxInfo, err := a.dockerCli.ContainerInspect(context.TODO(), sandbox.Id) if err != nil { return p, err } p.PID = uint32(sandboxInfo.State.Pid) } else { sandboxInfo := server.SandboxInfo{} err = json.Unmarshal([]byte(sandboxStatus.GetInfo()["info"]), &sandboxInfo) if err != nil { return p, err } p.PID = sandboxInfo.Pid } if sandboxStatus.Status.GetLinux().GetNamespaces().GetOptions().GetNetwork() == pb.NamespaceMode_POD { p.NetworkMode = "none" p.Netns = fmt.Sprintf("/proc/%d/ns/net", p.PID) } else { p.NetworkMode = "host" p.Netns = "" } return p, nil } func alphaRespTov1Resp( alphaRes interface{ Marshal() ([]byte, error) }, v1Res interface{ Unmarshal(_ []byte) error }, ) error { p, err := alphaRes.Marshal() if err != nil { return err } return v1Res.Unmarshal(p) } func (a *podCollector) PodList() ([]k8s.PodNetInfo, error) { var ( pods []k8s.PodNetInfo sandboxs *pb.ListPodSandboxResponse err error ) if a.runtimeCli != nil { sandboxs, err = a.runtimeCli.ListPodSandbox(context.TODO(), &pb.ListPodSandboxRequest{}) if err != nil { return nil, fmt.Errorf("error list pod sandbox: %v", err) } } else { alphaSandboxs, err := a.runtimeCliV1Alpha2.ListPodSandbox(context.TODO(), &pbv1alpha2.ListPodSandboxRequest{}) if err != nil { return nil, fmt.Errorf("error list pod sandbox: %v", err) } sandboxs = &pb.ListPodSandboxResponse{} err = alphaRespTov1Resp(alphaSandboxs, sandboxs) if err != nil { return nil, fmt.Errorf("error convert alpha pod sandbox: %v", err) } } for _, s := range sandboxs.Items { if s.GetState() == pb.PodSandboxState_SANDBOX_READY { podinfo, err := a.PodInfo(s) if err != nil { return nil, err } pods = append(pods, podinfo) } } return pods, nil } func (a *podCollector) SandboxInfos(pods []k8s.PodNetInfo, collectHostNs bool) ([]netstack.NetNSInfo, error) { var sandboxInfos []netstack.NetNSInfo if collectHostNs { hostNsInfo, err := a.SandboxInfo("/proc/1/ns/net", "", 1) if err != nil { return nil, err } sandboxInfos = append(sandboxInfos, hostNsInfo) } for _, p := range pods { if p.NetworkMode == "none" { nsInfo, err := a.SandboxInfo(p.Netns, fmt.Sprintf("%s/%s", p.PodNamespace, p.PodName), p.PID) if err != nil { return nil, err } sandboxInfos = append(sandboxInfos, nsInfo) } } return sandboxInfos, nil } func (a *podCollector) SandboxInfo(path, key string, pid uint32) (netstack.NetNSInfo, error) { sandboxInfo := netstack.NetNSInfo{ Netns: path, Key: key, PID: pid, } var err error sandboxInfo.NetnsID, err = getFileInode(path) if err != nil { return sandboxInfo, err } for _, infoCollector := range []func(*netstack.NetNSInfo) error{ interfaceCollector, sysctlCollector, routeCollector, ruleCollector, iptablesCollector, ipsetCollector, ipvsCollector, sockCollector, } { err = nsDo(path, &sandboxInfo, infoCollector) if err != nil { return sandboxInfo, fmt.Errorf("error run collector %+v, err: %v", runtime.FuncForPC(reflect.ValueOf(infoCollector).Pointer()).Name(), err) } } return sandboxInfo, nil } func nsDo(path string, sandboxInfo *netstack.NetNSInfo, f func(sandboxInfo *netstack.NetNSInfo) error) error { currentHandler, err := netns.Get() if err != nil { return err } defer func() { _ = netns.Set(currentHandler) }() nsHandler, err := netns.GetFromPath(path) if err != nil { return err } err = netns.Set(nsHandler) if err != nil { return err } return f(sandboxInfo) } func getFileInode(path string) (string, error) { fileStat, err := os.Stat(path) if err != nil { return "", err } fileInfo, ok := fileStat.Sys().(*syscall.Stat_t) if !ok { return "", fmt.Errorf("cannot found sysinfo from file stat: %v", path) } return strconv.FormatUint(fileInfo.Ino, 10), nil } // NSExec nsenter {pid} {command} func NSExec(args []string) (string, error) { if len(args) < 3 { return "", fmt.Errorf("command args invalid: %v", args) } fd, err := unix.Open(fmt.Sprintf("/proc/%s/ns/net", args[1]), unix.O_RDONLY|unix.O_CLOEXEC, 0) if err != nil { return "", fmt.Errorf("cannot get pid: %v", err) } defer func() { unix.Close(fd) }() err = unix.Setns(fd, unix.CLONE_NEWNET) if err != nil { return "", fmt.Errorf("cannot get pid from pid: %v", err) } output, err := exec.Command(args[2], args[3:]...).CombinedOutput() if err != nil { return "", fmt.Errorf("err:%v, output: %v", err, string(output)) } return string(output), nil } func namespaceCmd(pid uint32, cmd string) (string, error) { cmdExec := exec.Command("nsenter", strconv.Itoa(int(pid)), "sh", "-c", cmd) cmdExec.Path = "/proc/self/exe" output, err := cmdExec.CombinedOutput() if err != nil { return "", fmt.Errorf("err:%v, output: %v", err, string(output)) } return string(output), nil } func parseSysctls(sysctlsStr string) map[string]string { sysctls := map[string]string{} for _, sysctlStr := range strings.Split(sysctlsStr, "\n") { if sysctlSlice := strings.Split(sysctlStr, "="); len(sysctlSlice) == 2 { sysctls[strings.TrimSpace(sysctlSlice[0])] = strings.TrimSpace(sysctlSlice[1]) } } return sysctls } func interfaceCollector(sandboxInfo *netstack.NetNSInfo) error { links, err := netlink.LinkList() if err != nil { return err } for _, l := range links { attr := netstack.Interface{ Name: l.Attrs().Name, Index: l.Attrs().Index, MTU: l.Attrs().MTU, Driver: l.Type(), Addrs: []netstack.Addr{}, NeighInfo: []netstack.Neigh{}, FdbInfo: []netstack.Neigh{}, MasterIndex: l.Attrs().MasterIndex, PeerIndex: l.Attrs().ParentIndex, } switch l.Attrs().OperState { case netlink.OperUp: attr.State = netstack.LinkUP case netlink.OperDown: attr.State = netstack.LinkDown default: if l.Attrs().Flags&net.FlagUp != 0 { attr.State = netstack.LinkUP } else { attr.State = netstack.LinkUnknown } } addrs, err := netlink.AddrList(l, netlink.FAMILY_ALL) if err != nil { return err } for _, addr := range addrs { attr.Addrs = append(attr.Addrs, netstack.Addr{ IPNet: addr.IPNet, }) } sysctlsStr, err := namespaceCmd(sandboxInfo.PID, fmt.Sprintf("sysctl -a | grep '\\.%s\\.' || true", l.Attrs().Name)) if err != nil { return err } attr.DevSysctls = parseSysctls(sysctlsStr) fdbs, err := netlink.NeighList(l.Attrs().Index, syscall.AF_BRIDGE) if err != nil { return err } for _, fdb := range fdbs { attr.FdbInfo = append(attr.FdbInfo, netstack.Neigh{ Family: syscall.AF_BRIDGE, LinkIndex: l.Attrs().Index, State: fdb.State, Type: fdb.Type, Flags: fdb.Flags, IP: fdb.IP, HardwareAddr: fdb.HardwareAddr, }) } neighs, err := netlink.NeighList(l.Attrs().Index, netlink.FAMILY_V4) if err != nil { return err } for _, neigh := range neighs { attr.NeighInfo = append(attr.NeighInfo, netstack.Neigh{ Family: netlink.FAMILY_V4, LinkIndex: l.Attrs().Index, State: neigh.State, Type: neigh.Type, Flags: neigh.Flags, IP: neigh.IP, HardwareAddr: neigh.HardwareAddr, }) } sandboxInfo.Interfaces = append(sandboxInfo.Interfaces, attr) } return nil } func sysctlCollector(sandboxInfo *netstack.NetNSInfo) error { sysctlsStr, err := namespaceCmd(sandboxInfo.PID, "sysctl -a || true") if err != nil { return err } sandboxInfo.SysctlInfo = parseSysctls(sysctlsStr) return nil } func routeCollector(sandboxInfo *netstack.NetNSInfo) error { rules, err := netlink.RuleList(netlink.FAMILY_V4) if err != nil { return fmt.Errorf("error collector rule list: %v", err) } tableIDSet := map[int]interface{}{} for _, rule := range rules { if _, ok := tableIDSet[rule.Table]; !ok { tableIDSet[rule.Table] = struct{}{} } } for tableID := range tableIDSet { v4Route, err := netlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{Table: tableID}, netlink.RT_FILTER_TABLE) if err != nil { return fmt.Errorf("error collector route list: %v", err) } for _, route := range v4Route { var iif, oif netlink.Link if route.ILinkIndex != 0 { iif, err = netlink.LinkByIndex(route.ILinkIndex) if err != nil { return err } } if route.LinkIndex != 0 { oif, err = netlink.LinkByIndex(route.LinkIndex) if err != nil { return err } } routeInfo := netstack.Route{ Family: netlink.FAMILY_V4, Scope: netstack.Scope(route.Scope), Dst: route.Dst, Src: route.Src, Gw: route.Gw, Protocol: int(route.Protocol), Priority: route.Priority, Table: route.Table, Type: route.Type, Tos: route.Tos, Flags: route.Flags, } // default route if routeInfo.Dst == nil { _, routeInfo.Dst, _ = net.ParseCIDR("0.0.0.0/0") } if iif != nil { routeInfo.IifName = iif.Attrs().Name } if oif != nil { routeInfo.OifName = oif.Attrs().Name } sandboxInfo.RouteInfo = append(sandboxInfo.RouteInfo, routeInfo) } } return nil } func ruleCollector(sandboxInfo *netstack.NetNSInfo) error { v4Rule, err := netlink.RuleList(netlink.FAMILY_V4) if err != nil { return err } sandboxInfo.RuleInfo = []netstack.Rule{} for _, rule := range v4Rule { sandboxInfo.RuleInfo = append(sandboxInfo.RuleInfo, netstack.Rule{ Priority: rule.Priority, Family: rule.Family, Table: rule.Table, Mark: rule.Mark, Mask: rule.Mask, Tos: rule.Tos, TunID: rule.TunID, Goto: rule.Goto, Src: rule.Src, Dst: rule.Dst, Flow: rule.Flow, IifName: rule.IifName, OifName: rule.OifName, }) } return nil } func iptablesCollector(sandboxInfo *netstack.NetNSInfo) error { iptableDump, err := namespaceCmd(sandboxInfo.PID, "iptables-save|iptables-xml") if err != nil { return err } sandboxInfo.IptablesInfo = iptableDump return nil } func ipsetCollector(sandboxInfo *netstack.NetNSInfo) error { var err error info, err := namespaceCmd(sandboxInfo.PID, "ipset list -o xml") if err != nil { return err } sandboxInfo.IpsetInfo, err = netstack.ParseIPSet(info) return err } func ipvsCollector(sandboxInfo *netstack.NetNSInfo) error { path := fmt.Sprintf("/proc/%d/ns/net", sandboxInfo.PID) handler, err := ipvs.New(path) if err != nil { return err } services, err := handler.GetServices() if err != nil { return err } m := map[string]*netstack.IPVSService{} for _, svc := range services { i := &netstack.IPVSService{ Protocol: intToProtocol(svc.Protocol), IP: svc.Address.String(), Port: svc.Port, Scheduler: svc.SchedName, RS: nil, } dsts, err := handler.GetDestinations(svc) if err != nil { return err } for _, dst := range dsts { rs := netstack.RealServer{ IP: dst.Address.String(), Port: dst.Port, Masquerade: dst.ConnectionFlags == ipvs.ConnectionFlagMasq, Weight: dst.Weight, } i.RS = append(i.RS, rs) } m[i.Service()] = i } sandboxInfo.IPVSInfo = m return nil } func sockCollector(sandboxInfo *netstack.NetNSInfo) error { netstat.ProcRoot = fmt.Sprintf("/proc/%d/", sandboxInfo.PID) tcpConns, err := netstat.TCP.Connections() if err != nil { return fmt.Errorf("error get tcp connections: %v", err) } tcp6Conns, err := netstat.TCP6.Connections() if err != nil { return fmt.Errorf("error get tcp6 connections: %v", err) } tcpConns = append(tcpConns, tcp6Conns...) udpConns, err := netstat.UDP.Connections() if err != nil { return fmt.Errorf("error get udp connections: %v", err) } udp6Conns, err := netstat.UDP6.Connections() if err != nil { return fmt.Errorf("error get udp6 connections: %v", err) } udpConns = append(udpConns, udp6Conns...) for _, tc := range tcpConns { conn := netstack.ConnStat{ LocalIP: tc.IP.String(), LocalPort: uint16(tc.Port), RemoteIP: tc.RemoteIP.String(), RemotePort: uint16(tc.RemotePort), Protocol: model.TCP, } conn.State = netstack.SockStatUnknown if tc.State == netstat.TCPEstablished { conn.State = netstack.SockStatEstablish } if tc.State == netstat.TCPListen { conn.State = netstack.SockStatListen } sandboxInfo.ConnStats = append(sandboxInfo.ConnStats, conn) } for _, tc := range udpConns { conn := netstack.ConnStat{ LocalIP: tc.IP.String(), LocalPort: uint16(tc.Port), RemoteIP: tc.RemoteIP.String(), RemotePort: uint16(tc.RemotePort), Protocol: model.UDP, } conn.State = netstack.SockStatUnknown if tc.State == netstat.TCPEstablished { conn.State = netstack.SockStatEstablish } if slices.Contains([]string{"0.0.0.0", "::"}, tc.RemoteIP.String()) { conn.State = netstack.SockStatListen } sandboxInfo.ConnStats = append(sandboxInfo.ConnStats, conn) } return nil } func intToProtocol(proto uint16) model.Protocol { switch proto { case unix.IPPROTO_TCP: return model.TCP case unix.IPPROTO_UDP: return model.UDP } return "unknown" }