pkg/controller/service/capture.go (248 lines of code) (raw):

package service import ( "context" "fmt" "io" "os" "os/exec" "path" "strconv" "sync" "github.com/alibaba/kubeskoop/pkg/controller/k8s" "k8s.io/apimachinery/pkg/labels" "github.com/alibaba/kubeskoop/pkg/controller/rpc" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" log "k8s.io/klog/v2" ) const ( typePod = "Pod" typeNode = "Node" statusSuccess = "success" statusFailed = "failed" ) type CaptureArgs struct { CaptureList []struct { Type string `json:"type"` Name string `json:"name"` Nodename string `json:"nodename"` Namespace string `json:"namespace"` } `json:"capture_list"` CaptureDurationSeconds int `json:"capture_duration_seconds"` Filter string `json:"filter"` } type Pod struct { Name string `json:"name"` Namespace string `json:"namespace"` Nodename string `json:"nodename"` Labels map[string]string `json:"labels"` } type Node struct { Name string `json:"name"` Labels map[string]string `json:"labels"` } func podListWithInformer() ([]*Pod, error) { pods, err := k8s.PodInformer.Lister().Pods("").List(labels.Everything()) if err != nil { return nil, fmt.Errorf("list pods failed: %v", err) } return lo.Map[*corev1.Pod, *Pod](pods, func(pod *corev1.Pod, _ int) *Pod { return &Pod{ Name: pod.Name, Namespace: pod.Namespace, Nodename: pod.Spec.NodeName, Labels: pod.Labels, } }), nil } func (c *controller) PodList(ctx context.Context) ([]*Pod, error) { if k8s.PodInformer != nil { return podListWithInformer() } pods, err := c.k8sClient.CoreV1().Pods("").List(ctx, v1.ListOptions{}) if err != nil { return nil, fmt.Errorf("list pods failed: %v", err) } return lo.Map[corev1.Pod, *Pod](pods.Items, func(pod corev1.Pod, _ int) *Pod { return &Pod{ Name: pod.Name, Namespace: pod.Namespace, Nodename: pod.Spec.NodeName, Labels: pod.Labels, } }), nil } func (c *controller) NodeList(ctx context.Context) ([]*Node, error) { nodes, err := c.k8sClient.CoreV1().Nodes().List(ctx, v1.ListOptions{}) if err != nil { return nil, fmt.Errorf("list pods failed: %v", err) } return lo.Map[corev1.Node, *Node](nodes.Items, func(node corev1.Node, _ int) *Node { return &Node{ Name: node.Name, Labels: node.Labels, } }), nil } func (c *controller) NamespaceList(ctx context.Context) ([]string, error) { namespaces, err := c.k8sClient.CoreV1().Namespaces().List(ctx, v1.ListOptions{}) if err != nil { return nil, fmt.Errorf("list pods failed: %v", err) } return lo.Map[corev1.Namespace, string](namespaces.Items, func(namespace corev1.Namespace, _ int) string { return namespace.Name }), nil } type TaskSpec struct { TaskType string `json:"task_type"` Name string `json:"name"` Namespace string `json:"namespace"` } // todo reflect to generic task definition type CaptureTaskResult struct { TaskID int `json:"task_id"` Spec *TaskSpec `json:"spec"` Status string `json:"status"` Result string `json:"result"` Message string `json:"message"` } var ( captureTasks = sync.Map{} ) func (c *controller) Capture(ctx context.Context, capture *CaptureArgs) (int, error) { taskID := int(getTaskIdx()) var tasksToCommit []*rpc.CaptureInfo for _, captureItem := range capture.CaptureList { task := &rpc.CaptureInfo{ CaptureDurationSeconds: int32(capture.CaptureDurationSeconds), Filter: capture.Filter, } switch captureItem.Type { case typePod: var err error task.Pod, task.Node, _, err = c.getPodInfo(ctx, captureItem.Namespace, captureItem.Name) if err != nil { return 0, err } task.CaptureType = typePod case typeNode: task.Node = &rpc.NodeInfo{ Name: captureItem.Name, } task.CaptureType = typeNode default: return 0, fmt.Errorf("invalid capture type: %v", captureItem.Type) } tasksToCommit = append(tasksToCommit, task) } var resultList []*CaptureTaskResult for _, captureInfo := range tasksToCommit { _, err := c.commitTask(captureInfo.Node.Name, &rpc.Task{ Type: rpc.TaskType_Capture, Id: strconv.Itoa(taskID), TaskInfo: &rpc.Task_Capture{ Capture: captureInfo, }, }) if err != nil { return 0, err } var spec *TaskSpec if captureInfo.GetCaptureType() == typePod { spec = &TaskSpec{ TaskType: captureInfo.CaptureType, Name: captureInfo.GetPod().Name, Namespace: captureInfo.GetPod().Namespace, } } else { spec = &TaskSpec{ TaskType: captureInfo.CaptureType, Name: captureInfo.GetNode().Name, } } resultList = append(resultList, &CaptureTaskResult{ TaskID: taskID, Spec: spec, Status: "running", Result: "", Message: "", }) } captureTasks.Store(taskID, resultList) return taskID, nil } func (c *controller) CaptureList(_ context.Context) (map[int][]*CaptureTaskResult, error) { results := map[int][]*CaptureTaskResult{} captureTasks.Range(func(key, value interface{}) bool { id := key.(int) capture := value.([]*CaptureTaskResult) results[id] = capture return true }) return results, nil } func (c *controller) storeCaptureFile(_ context.Context, spec *TaskSpec, id int, result *rpc.CaptureResult) (string, error) { taskPath := fmt.Sprintf("/tmp/task_%d/", id) err := os.MkdirAll(taskPath, 0755) if err != nil { return "", err } captureFileName := "" if spec.TaskType == typePod { captureFileName = fmt.Sprintf("capture_task_%d_%s_%s", id, spec.Namespace, spec.Name) } else { captureFileName = fmt.Sprintf("capture_task_%d_%s_%s", id, "node", spec.Name) } captureFileName = captureFileName + "." + result.GetFileType() err = os.WriteFile(taskPath+captureFileName, result.Message, 0644) if err != nil { return "", err } return captureFileName, nil } func (c *controller) DownloadCaptureFile(ctx context.Context, id int) (string, int64, io.ReadCloser, error) { filename := fmt.Sprintf("/tmp/capture_task_%d.tar.gz", id) compressResults := exec.CommandContext(ctx, "tar", "-czf", filename, fmt.Sprintf("/tmp/task_%d/", id)) output, err := compressResults.CombinedOutput() if err != nil { return "", 0, nil, fmt.Errorf("error compress capture file: %v, output: %s", err, string(output)) } captureFD, err := os.Open(filename) if err != nil { return filename, 0, nil, fmt.Errorf("open capture file %s failed: %v", filename, err) } fs, err := captureFD.Stat() if err != nil { return "", 0, nil, fmt.Errorf("stat capture file %s failed: %v", filename, err) } _, filename = path.Split(filename) return filename, fs.Size(), captureFD, nil } func (c *controller) storeCaptureResult(ctx context.Context, result *rpc.TaskResult) (*rpc.TaskResultReply, error) { id, _ := strconv.Atoi(result.Id) value, ok := captureTasks.Load(id) if ok && result.GetType() == rpc.TaskType_Capture { log.Infof("store capture result for %v, %v", id, result.GetMessage()) captureResults := value.([]*CaptureTaskResult) for _, captureResult := range captureResults { if (result.GetTask().GetPod() != nil && result.GetTask().GetPod().GetNamespace() == captureResult.Spec.Namespace && result.GetTask().GetPod().GetName() == captureResult.Spec.Name) || (result.GetTask().GetPod() == nil && result.GetTask().GetNode().GetName() == captureResult.Spec.Name) { captureResult.Message = result.GetMessage() if result.GetSuccess() { captureResult.Status = statusSuccess captureFile, err := c.storeCaptureFile(ctx, captureResult.Spec, id, result.GetCapture()) if err != nil { return nil, fmt.Errorf("store capture file failed: %v", err) } captureResult.Result = captureFile } else { captureResult.Status = statusFailed } } } } return &rpc.TaskResultReply{ Success: true, Message: "", }, nil }