func()

in aggregators/aggregator.go [240:275]


func (a *Aggregator) Run(ctx context.Context) error {
	a.mu.Lock()
	if a.runStopped != nil {
		a.mu.Unlock()
		return errors.New("aggregator is already running")
	}
	a.runStopped = make(chan struct{})
	a.mu.Unlock()
	defer close(a.runStopped)

	to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
	timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
	defer timer.Stop()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-a.closed:
			return ErrAggregatorClosed
		case <-timer.C:
		}

		a.mu.Lock()
		batch := a.batch
		a.batch = nil
		a.processingTime = to
		cachedEventsStats := a.cachedEvents.loadAndDelete(to)
		a.mu.Unlock()

		if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
			a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
		}
		to = to.Add(a.cfg.AggregationIntervals[0])
		timer.Reset(time.Until(to.Add(a.cfg.HarvestDelay)))
	}
}