func()

in internal/satellite/module/gatherer/fetcher_gatherer.go [71:99]


func (f *FetcherGatherer) Boot(ctx context.Context) {
	log.Logger.WithField("pipe", f.config.PipeName).Info("fetch_gatherer module is starting...")
	var wg sync.WaitGroup
	wg.Add(f.PartitionCount() + 1)
	go func() {
		defer wg.Done()
		childCtx, cancel := context.WithCancel(ctx)
		f.runningFetcher.Fetch(childCtx)
		for {
			select {
			case e := <-f.runningFetcher.Channel():
				err := f.runningQueue.Enqueue(e)
				f.fetchCounter.Inc(f.config.PipeName, "all")
				if err != nil {
					f.fetchCounter.Inc(f.config.PipeName, "abandoned")
					log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.PipeName, err)
				}
			case <-childCtx.Done():
				cancel()
				return
			}
		}
	}()

	for p := 0; p < f.PartitionCount(); p++ {
		f.consumeQueue(ctx, p, &wg)
	}
	wg.Wait()
}