func()

in internal/satellite/module/gatherer/receiver_gatherer.go [85:112]


func (r *ReceiverGatherer) Boot(ctx context.Context) {
	r.runningReceiver.RegisterSyncInvoker(r)
	var wg sync.WaitGroup
	wg.Add(r.PartitionCount() + 1)
	log.Logger.WithField("pipe", r.config.PipeName).Info("receive_gatherer module is starting...")
	go func() {
		childCtx, cancel := context.WithCancel(ctx)
		defer wg.Done()
		for {
			select {
			case e := <-r.runningReceiver.Channel():
				r.receiveCounter.Inc(r.config.PipeName, "all")
				err := r.runningQueue.Enqueue(e)
				if err != nil {
					r.recordEnqueueError(err)
				}
			case <-childCtx.Done():
				cancel()
				return
			}
		}
	}()

	for p := 0; p < r.PartitionCount(); p++ {
		r.consumeQueue(ctx, p, &wg)
	}
	wg.Wait()
}