in plugins/outputs/cloudwatch/aggregator.go [118:177]
func (durationAgg *durationAggregator) aggregating() {
durationAgg.wg.Add(1)
// Sleep to align the interval to the wall clock.
// This initial sleep is not interrupted if the aggregator gets shutdown.
now := time.Now()
time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
defer durationAgg.ticker.Stop()
for {
// There is no priority to select{}.
// If there is a new metric AND the shutdownChan is closed when this
// loop begins, then the behavior is random.
select {
case m := <-durationAgg.aggregationChan:
if m == nil || m.Timestamp == nil || m.MetricName == nil || m.Unit == nil {
log.Printf("E! cannot aggregate nil or partial datum")
continue
}
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
aggregatedTime := m.Timestamp.Truncate(durationAgg.aggregationDuration)
metricMapKey := getAggregationKey(m, aggregatedTime.Unix())
aggregatedMetric, ok := durationAgg.metricMap[metricMapKey]
if !ok {
// First entry. Initialize it.
durationAgg.metricMap[metricMapKey] = m
if m.distribution == nil {
// Assume function pointer is always valid.
m.distribution = distribution.NewDistribution()
err := m.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit)
if err != nil {
if errors.Is(err, distribution.ErrUnsupportedValue) {
log.Printf("W! err %s, metric %s", err, *m.MetricName)
} else {
log.Printf("D! err %s, metric %s", err, *m.MetricName)
}
}
}
// Else the first entry has a distribution, so do nothing.
} else {
// Update an existing entry.
if m.distribution == nil {
err := aggregatedMetric.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit)
if err != nil {
log.Printf("W! err %s, metric %s", err, *m.MetricName)
}
} else {
aggregatedMetric.distribution.AddDistribution(m.distribution)
}
}
case <-durationAgg.ticker.C:
durationAgg.flush()
case <-durationAgg.shutdownChan:
log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, do the final flush now for aggregation interval %v", durationAgg.aggregationDuration)
durationAgg.flush()
log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, exiting.")
durationAgg.wg.Done()
return
}
}
}