in pkg/exporter/task-agent/agent.go [64:123]
func (a *Agent) watchTask() error {
var conn *grpc.ClientConn
var watchClient rpc.ControllerRegisterService_WatchTasksClient
reconn := func(maxAttempts int) error {
return retry("watching task", maxAttempts, func() error {
log.Infof("connecting to controller.")
if conn != nil {
_ = conn.Close()
}
var err error
conn, err = a.rpcConnect()
if err != nil {
return err
}
a.grpcClient = rpc.NewControllerRegisterServiceClient(conn)
watchClient, err = a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{
NodeName: a.NodeName,
Type: []rpc.TaskType{rpc.TaskType_Capture, rpc.TaskType_Ping},
})
if err != nil {
log.Errorf("failed to watch task: %v", err)
return err
}
log.Infof("controller connected.")
return nil
})
}
err := reconn(3)
if err != nil {
return err
}
go func() {
for {
select {
case <-watchClient.Context().Done():
log.Errorf("watch client closed")
_ = reconn(-1)
continue
default:
task, err := watchClient.Recv()
if err != nil {
log.Errorf("failed to receive task: %v", err)
_ = reconn(-1)
continue
}
err = a.ProcessTasks(task)
if err != nil {
log.Errorf("failed to process task: %v", err)
continue
}
}
}
}()
return nil
}