in pipeline/inputs/aggregator.go [184:211]
func (h *Aggregator) pushBucket(now time.Time) {
if h.currentBucket == nil {
h.currentBucket = newBucket(now)
return
}
var finishedReports []metrics.MetricReport
for _, namedReports := range h.currentBucket.Reports {
for _, report := range namedReports {
finishedReports = append(finishedReports, *report.metricReport())
}
}
if len(finishedReports) > 0 {
if len(finishedReports) == 1 {
glog.V(2).Infoln("aggregator: sending 1 report")
} else {
glog.V(2).Infof("aggregator: sending %v reports", len(finishedReports))
}
for _, r := range finishedReports {
err := h.input.AddReport(r)
if err != nil {
glog.Errorf("aggregator: error sending report: %+v", err)
continue
}
}
}
h.currentBucket = newBucket(now)
h.persistState()
}