func()

in internal/satellite/module/gatherer/receiver_gatherer.go [131:157]


func (r *ReceiverGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
	go func() {
		childCtx, cancel := context.WithCancel(ctx)
		defer wg.Done()
		for {
			select {
			case <-childCtx.Done():
				cancel()
				r.Shutdown()
				return
			default:
				if e, err := r.runningQueue.Dequeue(p); err == nil {
					r.outputChannel[p] <- e
					r.queueOutputCounter.Inc(r.config.PipeName, "success")
				} else if err == queue.ErrEmpty {
					time.Sleep(time.Second)
				} else {
					r.queueOutputCounter.Inc(r.config.PipeName, "error")
					log.Logger.WithFields(logrus.Fields{
						"pipe":  r.config.PipeName,
						"queue": r.runningQueue.Name(),
					}).Errorf("error in dequeue: %v", err)
				}
			}
		}
	}()
}