pkg/skoop/collector/manager/manager.go (471 lines of code) (raw):

package manager import ( "bytes" "context" "encoding/base64" "fmt" "time" "github.com/alibaba/kubeskoop/pkg/skoop/utils" "github.com/alibaba/kubeskoop/pkg/skoop/collector" ctx "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/alibaba/kubeskoop/pkg/skoop/k8s" "github.com/alibaba/kubeskoop/pkg/skoop/netstack" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/klog/v2" "k8s.io/utils/pointer" ) const ( defaultCollectorNamespace = "kubeskoop" defaultWaitInterval = 5 defaultWaitTimeout = 120 hostnsKey = "/proc/1/ns/net" ) type SimplePodCollectorManagerOptions struct { Image string CollectorNamespace string WaitInterval time.Duration WaitTimeout time.Duration } type simplePodCollectorManager struct { image string imagePullPolicy v1.PullPolicy namespace string runtimeAPIAddress string client *kubernetes.Clientset restConfig *rest.Config ipCache *k8s.IPCache cache map[string]*k8s.NodeNetworkStackDump nodeCache map[string]*k8s.NodeInfo podCache map[string]*k8s.Pod waitInterval time.Duration waitTimeout time.Duration preserveCollectorPod bool } func NewSimplePodCollectorManager(ctx *ctx.Context) (collector.Manager, error) { if Config.SimplePodCollectorConfig.Image == "" { return nil, fmt.Errorf("image must be provided") } if Config.SimplePodCollectorConfig.CollectorNamespace == "" { Config.SimplePodCollectorConfig.CollectorNamespace = defaultCollectorNamespace } if Config.SimplePodCollectorConfig.WaitInterval == 0 { Config.SimplePodCollectorConfig.WaitInterval = defaultWaitInterval * time.Second } if Config.SimplePodCollectorConfig.WaitTimeout == 0 { Config.SimplePodCollectorConfig.WaitTimeout = defaultWaitTimeout * time.Second } pullPolicy, err := utils.ConvertToImagePullPolicy(Config.SimplePodCollectorConfig.ImagePullPolicy) if err != nil { return nil, fmt.Errorf("failed to create pod collector manager: %w", err) } return &simplePodCollectorManager{ image: Config.SimplePodCollectorConfig.Image, imagePullPolicy: pullPolicy, namespace: Config.SimplePodCollectorConfig.CollectorNamespace, client: ctx.KubernetesClient(), restConfig: ctx.KubernetesRestClient(), ipCache: ctx.ClusterConfig().IPCache, cache: map[string]*k8s.NodeNetworkStackDump{}, nodeCache: map[string]*k8s.NodeInfo{}, podCache: map[string]*k8s.Pod{}, waitInterval: Config.SimplePodCollectorConfig.WaitInterval, waitTimeout: Config.SimplePodCollectorConfig.WaitTimeout, preserveCollectorPod: Config.SimplePodCollectorConfig.PreserveCollectorPod, runtimeAPIAddress: Config.SimplePodCollectorConfig.RuntimeAPIAddress, }, nil } func (m *simplePodCollectorManager) CollectNode(nodename string) (*k8s.NodeInfo, error) { if node, ok := m.nodeCache[nodename]; ok { return node, nil } node, err := m.ipCache.GetNodeFromName(nodename) if err != nil { return nil, err } os := utils.GetOSFromNode(node) if os != "linux" { return nil, fmt.Errorf("collector not supported for os type %q", os) } err = m.buildCache(nodename) if err != nil { return nil, err } nodeInfo := m.nodeCache[nodename] if nodeInfo == nil { return nil, fmt.Errorf("cannot collect node %s", nodename) } m.nodeCache[nodename] = nodeInfo return nodeInfo, nil } func (m *simplePodCollectorManager) CollectPod(namespace, name string) (*k8s.Pod, error) { podKey := fmt.Sprintf("%s/%s", namespace, name) if pod, ok := m.podCache[podKey]; ok { return pod, nil } pod, err := m.ipCache.GetPodFromName(namespace, name) if err != nil { return nil, err } if pod == nil { return nil, nil } err = m.buildCache(pod.Spec.NodeName) if err != nil { return nil, err } podInfo := m.podCache[podKey] if podInfo == nil { return nil, fmt.Errorf("cannot collect pod %s on node %s", podKey, pod.Spec.NodeName) } m.podCache[podKey] = podInfo return podInfo, nil } func (m *simplePodCollectorManager) buildCache(nodeName string) error { dump, err := m.collectNodeStackDump(nodeName) if err != nil { return err } netnsMap := map[string]netstack.NetNSInfo{} for _, netns := range dump.Netns { netnsMap[netns.Netns] = netns } if _, ok := netnsMap[hostnsKey]; !ok { return fmt.Errorf("cannot get host netns info for node %s", nodeName) } hostNetNS := netnsMap[hostnsKey] nodeInfo := &k8s.NodeInfo{ SubNetNSInfo: dump.Netns, NetNS: netstack.NetNS{NetNSInfo: &hostNetNS}, NodeMeta: k8s.NodeMeta{ NodeName: nodeName, }, } nodeInfo.Router = netstack.NewSimulateRouter(nodeInfo.NetNSInfo.RuleInfo, nodeInfo.NetNSInfo.RouteInfo, nodeInfo.NetNSInfo.Interfaces) nodeInfo.IPVS = netstack.NewIPVS(nodeInfo.NetNSInfo.IPVSInfo) nodeInfo.IPTables = netstack.ParseIPTables(nodeInfo.NetNSInfo.IptablesInfo) if err != nil { return err } nodeInfo.IPSetManager, err = netstack.NewIPSetManager(nodeInfo.NetNSInfo.IpsetInfo) if err != nil { return err } nodeInfo.Interfaces = nodeInfo.NetNSInfo.Interfaces nodeInfo.Neighbour = netstack.NewNeigh(nodeInfo.NetNSInfo.Interfaces) nodeInfo.Netfilter = netstack.NewSimulateNetfilter(netstack.SimulateNetfilterContext{ IPTables: nodeInfo.IPTables, IPSet: nodeInfo.IPSetManager, Router: nodeInfo.Router, IPVS: nodeInfo.IPVS, }) m.nodeCache[nodeName] = nodeInfo for _, p := range dump.Pods { podInfo := &k8s.Pod{ PodMeta: k8s.PodMeta{ Namespace: p.PodNamespace, PodName: p.PodName, HostNetwork: p.NetworkMode == "host", NodeName: nodeName, }, } if podInfo.HostNetwork { podInfo.NetNSInfo = nodeInfo.NetNSInfo } else { podNetNS, ok := netnsMap[p.Netns] if !ok { return fmt.Errorf("cannot get pod netns info for pod %s/%s, node %s", p.PodNamespace, p.PodName, nodeName) } podInfo.NetNSInfo = &podNetNS } podInfo.IPVS = netstack.NewIPVS(podInfo.NetNSInfo.IPVSInfo) podInfo.IPTables = netstack.ParseIPTables(podInfo.NetNSInfo.IptablesInfo) if err != nil { return err } podInfo.IPSetManager, err = netstack.NewIPSetManager(podInfo.NetNSInfo.IpsetInfo) if err != nil { return err } podInfo.Router = netstack.NewSimulateRouter(podInfo.NetNSInfo.RuleInfo, podInfo.NetNSInfo.RouteInfo, podInfo.NetNSInfo.Interfaces) podInfo.Interfaces = podInfo.NetNSInfo.Interfaces podInfo.Neighbour = netstack.NewNeigh(podInfo.NetNSInfo.Interfaces) podInfo.Netfilter = netstack.NewSimulateNetfilter(netstack.SimulateNetfilterContext{ IPTables: podInfo.IPTables, IPSet: podInfo.IPSetManager, Router: podInfo.Router, IPVS: podInfo.IPVS, }) podKey := fmt.Sprintf("%s/%s", p.PodNamespace, p.PodName) m.podCache[podKey] = podInfo } return nil } func (m *simplePodCollectorManager) collectNodeStackDump(nodeName string) (*k8s.NodeNetworkStackDump, error) { if dump, ok := m.cache[nodeName]; ok { return dump, nil } err := m.ensureNamespace() if err != nil { return nil, err } pod, err := m.createCollectorPod(nodeName) if err != nil { return nil, err } defer func() { if m.preserveCollectorPod { return } err := m.deleteCollectorPod(nodeName) if err != nil { klog.Errorf("failed delete collector pod: %s", err) } }() err = m.waitPodRunning(pod) if err != nil { return nil, err } data, err := m.readCollectorData(pod) if err != nil { return nil, err } m.cache[nodeName] = data return data, nil } func (m *simplePodCollectorManager) ensureNamespace() error { _, err := m.client.CoreV1().Namespaces().Get(context.TODO(), m.namespace, metav1.GetOptions{}) if err == nil { return nil } if errors.IsNotFound(err) { ns := &v1.Namespace{ TypeMeta: metav1.TypeMeta{ Kind: "Namespace", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Name: m.namespace, }, } _, err := m.client.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) if err != nil { return err } return nil } return err } func (m *simplePodCollectorManager) createCollectorPod(nodeName string) (*v1.Pod, error) { klog.V(3).Infof("Creating pod on node %s with image %s", nodeName, m.image) hostPathType := v1.HostPathDirectory podName := fmt.Sprintf("collector-%s", nodeName) err := m.ensurePodClean(podName) if err != nil { return nil, err } pod := &v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Namespace: m.namespace, Name: podName, }, Spec: v1.PodSpec{ InitContainers: []v1.Container{ { Name: "collector", Image: m.image, ImagePullPolicy: m.imagePullPolicy, SecurityContext: &v1.SecurityContext{ Privileged: pointer.Bool(true), }, Env: []v1.EnvVar{{ Name: "RUNTIME_SOCK", Value: m.runtimeAPIAddress, }}, Command: []string{"/bin/pod-collector"}, VolumeMounts: []v1.VolumeMount{ { Name: "cri-dir", MountPath: "/var/run", }, { Name: "data", MountPath: "/data", }, { Name: "lib-modules", MountPath: "/lib/modules", }, }, }, }, Containers: []v1.Container{ { Name: "alive", Image: m.image, ImagePullPolicy: m.imagePullPolicy, Command: []string{ "/bin/sh", "-c", "while true;do sleep 100;done;", }, VolumeMounts: []v1.VolumeMount{ { Name: "data", MountPath: "/data", }, }, }, }, NodeName: nodeName, HostNetwork: true, HostPID: true, HostIPC: true, RestartPolicy: "Never", Volumes: []v1.Volume{ { Name: "cri-dir", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/var/run", Type: &hostPathType, }, }, }, { Name: "lib-modules", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/lib/modules", Type: &hostPathType, }, }, }, { Name: "data", VolumeSource: v1.VolumeSource{ EmptyDir: &v1.EmptyDirVolumeSource{}, }, }, }, Tolerations: []v1.Toleration{ { Operator: v1.TolerationOpExists, }, }, }, Status: v1.PodStatus{}, } return m.client.CoreV1().Pods(m.namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) } func (m *simplePodCollectorManager) deleteCollectorPod(nodeName string) error { podName := fmt.Sprintf("collector-%s", nodeName) err := m.client.CoreV1().Pods(m.namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) if errors.IsNotFound(err) { return nil } return err } func (m *simplePodCollectorManager) ensurePodClean(podName string) error { err := m.client.CoreV1().Pods(m.namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) if errors.IsNotFound(err) { return nil } if err != nil { return err } err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 20*time.Second, false, func(_ context.Context) (bool, error) { _, err := m.client.CoreV1().Pods(m.namespace).Get(context.TODO(), podName, metav1.GetOptions{}) if errors.IsNotFound(err) { return true, nil } if err != nil { return false, err } return false, nil }) return err } func (m *simplePodCollectorManager) waitPodRunning(pod *v1.Pod) error { err := wait.PollUntilContextTimeout(context.Background(), m.waitInterval, m.waitTimeout, true, func(_ context.Context) (bool, error) { current, err := m.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) if err != nil { klog.V(2).Infof("Get pod %s/%s failed, will retry. error: %s", pod.Namespace, pod.Name, err.Error()) return false, nil } klog.V(2).Infof("Waiting pod %s/%s running, current status: %s", pod.Namespace, pod.Name, current.Status.Phase) switch current.Status.Phase { case v1.PodRunning: return true, nil case v1.PodSucceeded, v1.PodFailed, v1.PodUnknown: return false, fmt.Errorf("pod in unexpected status %s, log: %s", current.Status.Phase, m.getCollectorPodTailLog(pod)) } return false, nil }) if err != nil { return fmt.Errorf("wait pod running failed: %s", err) } return nil } func (m *simplePodCollectorManager) getCollectorPodTailLog(pod *v1.Pod) string { log, err := m.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Namespace, &v1.PodLogOptions{ Container: "collector", TailLines: pointer.Int64(10), }).Do(context.TODO()).Raw() if err != nil { return "" } return string(log) } func (m *simplePodCollectorManager) readCollectorData(pod *v1.Pod) (*k8s.NodeNetworkStackDump, error) { klog.V(3).Infof("Trying read collector data from pod %s/%s", pod.Namespace, pod.Name) req := m.client.CoreV1().RESTClient().Post().Resource("pods"). Namespace(pod.Namespace). Name(pod.Name). SubResource("exec"). Param("container", "alive"). VersionedParams(&v1.PodExecOptions{ Stdout: true, Stderr: true, Command: []string{ "sh", "-c", "cat /data/collector.json | base64", }, }, scheme.ParameterCodec) outBuffer := &bytes.Buffer{} errBuffer := &bytes.Buffer{} exec, err := remotecommand.NewSPDYExecutor(m.restConfig, "POST", req.URL()) if err != nil { return nil, err } err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: nil, Stdout: outBuffer, Stderr: errBuffer, }) if err != nil { return nil, err } var dump = &k8s.NodeNetworkStackDump{} output, err := base64.StdEncoding.DecodeString(outBuffer.String()) if err != nil { return nil, fmt.Errorf("%s, stderr: %s", err, errBuffer.String()) } err = json.Unmarshal(output, dump) if err != nil { return nil, fmt.Errorf("%s, stderr: %s", err, errBuffer.String()) } return dump, nil }