func()

in internal/satellite/module/gatherer/fetcher_gatherer.go [101:125]


func (f *FetcherGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
	go func() {
		defer wg.Done()
		childCtx, cancel := context.WithCancel(ctx)
		for {
			select {
			case <-childCtx.Done():
				cancel()
				f.Shutdown()
				return
			default:
				if e, err := f.runningQueue.Dequeue(p); err == nil {
					f.outputChannel[p] <- e
					f.queueOutputCounter.Inc(f.config.PipeName, "success")
				} else if err == queue.ErrEmpty {
					time.Sleep(time.Second)
				} else {
					f.queueOutputCounter.Inc(f.config.PipeName, "error")
					log.Logger.Errorf("error in popping from the queue: %v", err)
				}
			}
		}
	}()
	wg.Wait()
}