func()

in plugins/outputs/cloudwatch/aggregator.go [120:194]


func (durationAgg *durationAggregator) aggregating() {
	durationAgg.wg.Add(1)
	// sleep for some time until next round duration from now.
	now := time.Now()
	time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now))
	durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration)
	defer durationAgg.ticker.Stop()
	for {
		select {
		case m := <-durationAgg.aggregationChan:
			// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
			aggregatedTime := m.Time().Truncate(durationAgg.aggregationDuration)
			metricMapKey := fmt.Sprint(computeHash(m), aggregatedTime.Unix())
			var aggregatedMetric telegraf.Metric
			var ok bool
			var err error
			if aggregatedMetric, ok = durationAgg.metricMap[metricMapKey]; !ok {
				aggregatedMetric, err = metric.New(m.Name(), m.Tags(), map[string]interface{}{}, aggregatedTime)
				if err != nil {
					log.Printf("E! CloudWatch metrics aggregation failed: %v. The metric %v will be dropped.", err, m.Name())
					continue
				}
				durationAgg.metricMap[metricMapKey] = aggregatedMetric
			}
			//When the code comes here, it means the aggregatedMetric object has the same metric name, tags and aggregated time.
			//We just need to aggregate the additional fields if any and the values for the fields.
			for k, v := range m.Fields() {
				var value float64
				var dist distribution.Distribution
				switch t := v.(type) {
				case int:
					value = float64(t)
				case int32:
					value = float64(t)
				case int64:
					value = float64(t)
				case float64:
					value = t
				case bool:
					if t {
						value = 1
					} else {
						value = 0
					}
				case time.Time:
					value = float64(t.Unix())
				case distribution.Distribution:
					dist = t
				default:
					// Skip unsupported type.
					continue
				}
				var existingValue interface{}
				if existingValue, ok = aggregatedMetric.Fields()[k]; !ok {
					existingValue = distribution.NewDistribution()
					aggregatedMetric.AddField(k, existingValue)
				}
				existingDist := existingValue.(distribution.Distribution)
				if dist != nil {
					existingDist.AddDistribution(dist)
				} else {
					existingDist.AddEntry(value, 1)
				}
			}
		case <-durationAgg.ticker.C:
			durationAgg.flush()
		case <-durationAgg.shutdownChan:
			log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, do the final flush now for aggregation interval %v", durationAgg.aggregationDuration)
			durationAgg.flush()
			log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, exiting.")
			durationAgg.wg.Done()
			return
		}
	}
}