pkg/exporter/task-agent/agent.go (236 lines of code) (raw):

package taskagent import ( "context" "fmt" "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/grpc/credentials/insecure" "github.com/alibaba/kubeskoop/pkg/controller/rpc" "github.com/alibaba/kubeskoop/pkg/exporter/nettop" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) func NewTaskAgent(controllerAddr string) *Agent { return &Agent{ NodeName: nettop.GetNodeName(), controllerAddr: controllerAddr, } } type Agent struct { NodeName string grpcClient rpc.ControllerRegisterServiceClient ipCacheClient rpc.IPCacheServiceClient controllerAddr string } func (a *Agent) rpcConnect() (*grpc.ClientConn, error) { var opts []grpc.CallOption opts = append(opts, grpc.MaxCallSendMsgSize(102*1024*1024)) return grpc.Dial(a.controllerAddr, grpc.WithDefaultCallOptions(opts...), grpc.WithTransportCredentials(insecure.NewCredentials())) } func retry(msg string, maxAttempts int, work func() error) error { attempts := 0 backoff := 1 // unit: second for { err := work() if err == nil { return nil } log.Errorf("retry %s error: %s", msg, err) attempts++ if maxAttempts > 0 && attempts >= maxAttempts { return fmt.Errorf("failed %s after %d attempts", msg, attempts) } backoff = backoff * 2 if backoff > 10 { backoff = 10 } log.Warningf("retry %s after %d seconds", msg, backoff) time.Sleep(time.Duration(backoff) * time.Second) } } func (a *Agent) watchTask() error { var conn *grpc.ClientConn var watchClient rpc.ControllerRegisterService_WatchTasksClient reconn := func(maxAttempts int) error { return retry("watching task", maxAttempts, func() error { log.Infof("connecting to controller.") if conn != nil { _ = conn.Close() } var err error conn, err = a.rpcConnect() if err != nil { return err } a.grpcClient = rpc.NewControllerRegisterServiceClient(conn) watchClient, err = a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{ NodeName: a.NodeName, Type: []rpc.TaskType{rpc.TaskType_Capture, rpc.TaskType_Ping}, }) if err != nil { log.Errorf("failed to watch task: %v", err) return err } log.Infof("controller connected.") return nil }) } err := reconn(3) if err != nil { return err } go func() { for { select { case <-watchClient.Context().Done(): log.Errorf("watch client closed") _ = reconn(-1) continue default: task, err := watchClient.Recv() if err != nil { log.Errorf("failed to receive task: %v", err) _ = reconn(-1) continue } err = a.ProcessTasks(task) if err != nil { log.Errorf("failed to process task: %v", err) continue } } } }() return nil } func (a *Agent) syncIPCache() error { var conn *grpc.ClientConn var watchClient rpc.IPCacheService_WatchCacheClient entry2IPInfo := func(e *rpc.CacheEntry) *nettop.IPInfo { info := &nettop.IPInfo{ IP: e.IP, } switch v := e.Meta.(type) { case *rpc.CacheEntry_Node: info.Type = nettop.IPTypeNode info.NodeName = v.Node.Name case *rpc.CacheEntry_Pod: info.Type = nettop.IPTypePod info.PodNamespace = v.Pod.Namespace info.PodName = v.Pod.Name default: return nil } return info } reconn := func(maxAttempts int, relist bool) error { return retry("watching ipcache", maxAttempts, func() error { log.Infof("connecting to controller.") if conn != nil { _ = conn.Close() } var err error conn, err = a.rpcConnect() if err != nil { return err } a.ipCacheClient = rpc.NewIPCacheServiceClient(conn) period, revision := nettop.IPCacheRevision() if period == "" { relist = true } if relist { log.Warnf("list ipcache") listResp, err := a.ipCacheClient.ListCache(context.TODO(), &rpc.ListCacheRequest{}) if err != nil { return err } period = listResp.Period revision = listResp.Revision log.Infof("current period:%s, revision: %d", period, revision) var ipInfoSlice []*nettop.IPInfo for _, e := range listResp.Entries { info := entry2IPInfo(e) if info == nil { continue } ipInfoSlice = append(ipInfoSlice, info) } nettop.UpdateIPCache(period, revision, ipInfoSlice) } rq := &rpc.WatchCacheRequest{ Period: period, Revision: revision, } watchClient, err = a.ipCacheClient.WatchCache(context.TODO(), rq) if err != nil { log.Errorf("failed to watch ipcache: %v", err) return err } return nil }) } err := reconn(3, false) if err != nil { return err } go func() { for { select { case <-watchClient.Context().Done(): log.Errorf("ipcache watch client closed") _ = reconn(-1, false) continue default: resp, err := watchClient.Recv() if err != nil { log.Errorf("ipcache failed to receive task: %v", err) s, ok := status.FromError(err) if ok && (s.Code() == codes.DataLoss || s.Code() == codes.InvalidArgument) { _ = reconn(-1, true) } else { _ = reconn(-1, false) } continue } info := entry2IPInfo(resp.Entry) log.Debugf("ip cache changed op=%d,info=[%s]", resp.Opcode, info) nettop.ApplyIPCacheChange(resp.Revision, resp.Opcode, info) } } }() return nil } func (a *Agent) Run() error { if err := a.watchTask(); err != nil { return err } if err := a.syncIPCache(); err != nil { return err } return nil } func (a *Agent) ProcessTasks(task *rpc.ServerTask) error { log.Infof("process task: %v", task) switch task.GetTask().GetType() { case rpc.TaskType_Capture: go func() { err := a.ProcessCapture(task) if err != nil { log.Errorf("failed to process capture: %v", err) } }() return nil case rpc.TaskType_Ping: go func() { err := a.ProcessPing(task) if err != nil { log.Errorf("failed to process ping: %v", err) } }() return nil } return nil }