func()

in pkg/exporter/task-agent/agent.go [64:123]


func (a *Agent) watchTask() error {
	var conn *grpc.ClientConn
	var watchClient rpc.ControllerRegisterService_WatchTasksClient

	reconn := func(maxAttempts int) error {
		return retry("watching task", maxAttempts, func() error {
			log.Infof("connecting to controller.")
			if conn != nil {
				_ = conn.Close()
			}
			var err error
			conn, err = a.rpcConnect()
			if err != nil {
				return err
			}

			a.grpcClient = rpc.NewControllerRegisterServiceClient(conn)

			watchClient, err = a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{
				NodeName: a.NodeName,
				Type:     []rpc.TaskType{rpc.TaskType_Capture, rpc.TaskType_Ping},
			})
			if err != nil {
				log.Errorf("failed to watch task: %v", err)
				return err
			}
			log.Infof("controller connected.")
			return nil
		})
	}

	err := reconn(3)
	if err != nil {
		return err
	}

	go func() {
		for {
			select {
			case <-watchClient.Context().Done():
				log.Errorf("watch client closed")
				_ = reconn(-1)
				continue
			default:
				task, err := watchClient.Recv()
				if err != nil {
					log.Errorf("failed to receive task: %v", err)
					_ = reconn(-1)
					continue
				}
				err = a.ProcessTasks(task)
				if err != nil {
					log.Errorf("failed to process task: %v", err)
					continue
				}
			}
		}
	}()
	return nil
}