func()

in pipeline/inputs/aggregator.go [184:211]


func (h *Aggregator) pushBucket(now time.Time) {
	if h.currentBucket == nil {
		h.currentBucket = newBucket(now)
		return
	}
	var finishedReports []metrics.MetricReport
	for _, namedReports := range h.currentBucket.Reports {
		for _, report := range namedReports {
			finishedReports = append(finishedReports, *report.metricReport())
		}
	}
	if len(finishedReports) > 0 {
		if len(finishedReports) == 1 {
			glog.V(2).Infoln("aggregator: sending 1 report")
		} else {
			glog.V(2).Infof("aggregator: sending %v reports", len(finishedReports))
		}
		for _, r := range finishedReports {
			err := h.input.AddReport(r)
			if err != nil {
				glog.Errorf("aggregator: error sending report: %+v", err)
				continue
			}
		}
	}
	h.currentBucket = newBucket(now)
	h.persistState()
}