in internal/satellite/module/gatherer/receiver_gatherer.go [131:157]
func (r *ReceiverGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
go func() {
childCtx, cancel := context.WithCancel(ctx)
defer wg.Done()
for {
select {
case <-childCtx.Done():
cancel()
r.Shutdown()
return
default:
if e, err := r.runningQueue.Dequeue(p); err == nil {
r.outputChannel[p] <- e
r.queueOutputCounter.Inc(r.config.PipeName, "success")
} else if err == queue.ErrEmpty {
time.Sleep(time.Second)
} else {
r.queueOutputCounter.Inc(r.config.PipeName, "error")
log.Logger.WithFields(logrus.Fields{
"pipe": r.config.PipeName,
"queue": r.runningQueue.Name(),
}).Errorf("error in dequeue: %v", err)
}
}
}
}()
}