in aggregators/aggregator.go [240:275]
func (a *Aggregator) Run(ctx context.Context) error {
a.mu.Lock()
if a.runStopped != nil {
a.mu.Unlock()
return errors.New("aggregator is already running")
}
a.runStopped = make(chan struct{})
a.mu.Unlock()
defer close(a.runStopped)
to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrAggregatorClosed
case <-timer.C:
}
a.mu.Lock()
batch := a.batch
a.batch = nil
a.processingTime = to
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()
if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
timer.Reset(time.Until(to.Add(a.cfg.HarvestDelay)))
}
}