func()

in plugins/outputs/cloudwatch/aggregator.go [118:177]


func (durationAgg *durationAggregator) aggregating() {
	durationAgg.wg.Add(1)
	// Sleep to align the interval to the wall clock.
	// This initial sleep is not interrupted if the aggregator gets shutdown.
	now := time.Now()
	time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
	durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
	defer durationAgg.ticker.Stop()
	for {
		// There is no priority to select{}.
		// If there is a new metric AND the shutdownChan is closed when this
		// loop begins, then the behavior is random.
		select {
		case m := <-durationAgg.aggregationChan:
			if m == nil || m.Timestamp == nil || m.MetricName == nil || m.Unit == nil {
				log.Printf("E! cannot aggregate nil or partial datum")
				continue
			}
			// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
			aggregatedTime := m.Timestamp.Truncate(durationAgg.aggregationDuration)
			metricMapKey := getAggregationKey(m, aggregatedTime.Unix())
			aggregatedMetric, ok := durationAgg.metricMap[metricMapKey]
			if !ok {
				// First entry. Initialize it.
				durationAgg.metricMap[metricMapKey] = m
				if m.distribution == nil {
					// Assume function pointer is always valid.
					m.distribution = distribution.NewDistribution()
					err := m.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit)
					if err != nil {
						if errors.Is(err, distribution.ErrUnsupportedValue) {
							log.Printf("W! err %s, metric %s", err, *m.MetricName)
						} else {
							log.Printf("D! err %s, metric %s", err, *m.MetricName)
						}
					}
				}
				// Else the first entry has a distribution, so do nothing.
			} else {
				// Update an existing entry.
				if m.distribution == nil {
					err := aggregatedMetric.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit)
					if err != nil {
						log.Printf("W! err %s, metric %s", err, *m.MetricName)
					}
				} else {
					aggregatedMetric.distribution.AddDistribution(m.distribution)
				}
			}
		case <-durationAgg.ticker.C:
			durationAgg.flush()
		case <-durationAgg.shutdownChan:
			log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, do the final flush now for aggregation interval %v", durationAgg.aggregationDuration)
			durationAgg.flush()
			log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, exiting.")
			durationAgg.wg.Done()
			return
		}
	}
}