func newAggregator()

in pipeline/inputs/aggregator.go [65:82]


func newAggregator(metric metrics.Definition, bufferTime time.Duration, input pipeline.Input, persistence persistence.Persistence, clock clock.Clock) *Aggregator {
	agg := &Aggregator{
		metric:      metric,
		bufferTime:  bufferTime,
		input:       input,
		persistence: persistence,
		clock:       clock,
		push:        make(chan chan bool),
		add:         make(chan addMsg),
	}
	if !agg.loadState() {
		agg.currentBucket = newBucket(clock.Now())
	}
	input.Use()
	agg.wait.Add(1)
	go agg.run()
	return agg
}