in internal/satellite/module/gatherer/receiver_gatherer.go [85:112]
func (r *ReceiverGatherer) Boot(ctx context.Context) {
r.runningReceiver.RegisterSyncInvoker(r)
var wg sync.WaitGroup
wg.Add(r.PartitionCount() + 1)
log.Logger.WithField("pipe", r.config.PipeName).Info("receive_gatherer module is starting...")
go func() {
childCtx, cancel := context.WithCancel(ctx)
defer wg.Done()
for {
select {
case e := <-r.runningReceiver.Channel():
r.receiveCounter.Inc(r.config.PipeName, "all")
err := r.runningQueue.Enqueue(e)
if err != nil {
r.recordEnqueueError(err)
}
case <-childCtx.Done():
cancel()
return
}
}
}()
for p := 0; p < r.PartitionCount(); p++ {
r.consumeQueue(ctx, p, &wg)
}
wg.Wait()
}