in pipeline/inputs/aggregator.go [131:159]
func (h *Aggregator) run() {
running := true
for running {
// Set a timer to fire when the current bucket should be pushed.
now := h.clock.Now()
nextFire := now.Add(h.bufferTime - now.Sub(h.currentBucket.CreateTime))
timer := h.clock.NewTimerAt(nextFire)
select {
case msg, ok := <-h.add:
if ok {
err := h.currentBucket.addReport(msg.report)
if err == nil {
// TODO(volkman): possibly rate-limit persistence, or flush to disk at a defined interval.
// Perhaps a benchmark to determine whether eager persistence is a bottleneck.
h.persistState()
}
msg.result <- err
} else {
running = false
}
case now := <-timer.GetC():
// Time to push the current bucket.
h.pushBucket(now)
}
timer.Stop()
}
h.pushBucket(h.clock.Now())
h.wait.Done()
}