in internal/satellite/module/gatherer/fetcher_gatherer.go [101:125]
func (f *FetcherGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
go func() {
defer wg.Done()
childCtx, cancel := context.WithCancel(ctx)
for {
select {
case <-childCtx.Done():
cancel()
f.Shutdown()
return
default:
if e, err := f.runningQueue.Dequeue(p); err == nil {
f.outputChannel[p] <- e
f.queueOutputCounter.Inc(f.config.PipeName, "success")
} else if err == queue.ErrEmpty {
time.Sleep(time.Second)
} else {
f.queueOutputCounter.Inc(f.config.PipeName, "error")
log.Logger.Errorf("error in popping from the queue: %v", err)
}
}
}
}()
wg.Wait()
}