in internal/satellite/module/gatherer/fetcher_gatherer.go [71:99]
func (f *FetcherGatherer) Boot(ctx context.Context) {
log.Logger.WithField("pipe", f.config.PipeName).Info("fetch_gatherer module is starting...")
var wg sync.WaitGroup
wg.Add(f.PartitionCount() + 1)
go func() {
defer wg.Done()
childCtx, cancel := context.WithCancel(ctx)
f.runningFetcher.Fetch(childCtx)
for {
select {
case e := <-f.runningFetcher.Channel():
err := f.runningQueue.Enqueue(e)
f.fetchCounter.Inc(f.config.PipeName, "all")
if err != nil {
f.fetchCounter.Inc(f.config.PipeName, "abandoned")
log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.PipeName, err)
}
case <-childCtx.Done():
cancel()
return
}
}
}()
for p := 0; p < f.PartitionCount(); p++ {
f.consumeQueue(ctx, p, &wg)
}
wg.Wait()
}