in plugins/outputs/cloudwatch/aggregator.go [120:194]
func (durationAgg *durationAggregator) aggregating() {
durationAgg.wg.Add(1)
// sleep for some time until next round duration from now.
now := time.Now()
time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
defer durationAgg.ticker.Stop()
for {
select {
case m := <-durationAgg.aggregationChan:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
aggregatedTime := m.Time().Truncate(durationAgg.aggregationDuration)
metricMapKey := fmt.Sprint(computeHash(m), aggregatedTime.Unix())
var aggregatedMetric telegraf.Metric
var ok bool
var err error
if aggregatedMetric, ok = durationAgg.metricMap[metricMapKey]; !ok {
aggregatedMetric, err = metric.New(m.Name(), m.Tags(), map[string]interface{}{}, aggregatedTime)
if err != nil {
log.Printf("E! CloudWatch metrics aggregation failed: %v. The metric %v will be dropped.", err, m.Name())
continue
}
durationAgg.metricMap[metricMapKey] = aggregatedMetric
}
//When the code comes here, it means the aggregatedMetric object has the same metric name, tags and aggregated time.
//We just need to aggregate the additional fields if any and the values for the fields.
for k, v := range m.Fields() {
var value float64
var dist distribution.Distribution
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
case distribution.Distribution:
dist = t
default:
// Skip unsupported type.
continue
}
var existingValue interface{}
if existingValue, ok = aggregatedMetric.Fields()[k]; !ok {
existingValue = distribution.NewDistribution()
aggregatedMetric.AddField(k, existingValue)
}
existingDist := existingValue.(distribution.Distribution)
if dist != nil {
existingDist.AddDistribution(dist)
} else {
existingDist.AddEntry(value, 1)
}
}
case <-durationAgg.ticker.C:
durationAgg.flush()
case <-durationAgg.shutdownChan:
log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, do the final flush now for aggregation interval %v", durationAgg.aggregationDuration)
durationAgg.flush()
log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, exiting.")
durationAgg.wg.Done()
return
}
}
}