in pipeline/inputs/aggregator.go [65:82]
func newAggregator(metric metrics.Definition, bufferTime time.Duration, input pipeline.Input, persistence persistence.Persistence, clock clock.Clock) *Aggregator {
agg := &Aggregator{
metric: metric,
bufferTime: bufferTime,
input: input,
persistence: persistence,
clock: clock,
push: make(chan chan bool),
add: make(chan addMsg),
}
if !agg.loadState() {
agg.currentBucket = newBucket(clock.Now())
}
input.Use()
agg.wait.Add(1)
go agg.run()
return agg
}