func()

in pipeline/inputs/aggregator.go [131:159]


func (h *Aggregator) run() {
	running := true
	for running {
		// Set a timer to fire when the current bucket should be pushed.
		now := h.clock.Now()
		nextFire := now.Add(h.bufferTime - now.Sub(h.currentBucket.CreateTime))
		timer := h.clock.NewTimerAt(nextFire)
		select {
		case msg, ok := <-h.add:
			if ok {
				err := h.currentBucket.addReport(msg.report)
				if err == nil {
					// TODO(volkman): possibly rate-limit persistence, or flush to disk at a defined interval.
					// Perhaps a benchmark to determine whether eager persistence is a bottleneck.
					h.persistState()
				}
				msg.result <- err
			} else {
				running = false
			}
		case now := <-timer.GetC():
			// Time to push the current bucket.
			h.pushBucket(now)
		}
		timer.Stop()
	}
	h.pushBucket(h.clock.Now())
	h.wait.Done()
}