plugins/outputs/cloudwatch/aggregator.go (149 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package cloudwatch import ( "errors" "fmt" "log" "strings" "sync" "time" "github.com/aws/amazon-cloudwatch-agent/metric/distribution" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatch" ) const ( aggregationIntervalTagKey = "aws:AggregationInterval" durationAggregationChanBufferSize = 10000 ) // aggregationDatum just adds a few extra fields to the MetricDatum. // If aggregationInterval is 0, then no aggregation is done. // If receivers set the special attribute "aws:AggregationInterval", then // this exporter will remove it and do aggregation. type aggregationDatum struct { cloudwatch.MetricDatum aggregationInterval time.Duration distribution distribution.Distribution entity cloudwatch.Entity } type Aggregator interface { AddMetric(m *aggregationDatum) } var _ Aggregator = (*aggregator)(nil) type aggregator struct { durationMap map[time.Duration]*durationAggregator metricChan chan<- *aggregationDatum shutdownChan <-chan struct{} wg *sync.WaitGroup } func NewAggregator(metricChan chan<- *aggregationDatum, shutdownChan <-chan struct{}, wg *sync.WaitGroup) Aggregator { return &aggregator{ durationMap: make(map[time.Duration]*durationAggregator), metricChan: metricChan, shutdownChan: shutdownChan, wg: wg, } } func getAggregationKey(m *aggregationDatum, unixTime int64) string { tmp := make([]string, len(m.Dimensions)) for i, d := range m.Dimensions { if d.Name == nil || d.Value == nil { log.Printf("E! dimentions key and/or val is nil") continue } tmp[i] = fmt.Sprintf("%s=%s", *d.Name, *d.Value) } // Assume m.Dimensions was already sorted. return fmt.Sprintf("%s:%s:%v", *m.MetricName, strings.Join(tmp, ","), unixTime) } func (agg *aggregator) AddMetric(m *aggregationDatum) { if m.aggregationInterval == 0 { // no aggregation interval field key, pass through directly. agg.metricChan <- m return } aggDurationMapKey := m.aggregationInterval.Truncate(time.Second) durationAgg, ok := agg.durationMap[aggDurationMapKey] if !ok { durationAgg = newDurationAggregator(aggDurationMapKey, agg.metricChan, agg.shutdownChan, agg.wg) agg.durationMap[aggDurationMapKey] = durationAgg } // auto configure high resolution if aggDurationMapKey < time.Minute { m.SetStorageResolution(1) } durationAgg.addMetric(m) } type durationAggregator struct { aggregationDuration time.Duration metricChan chan<- *aggregationDatum shutdownChan <-chan struct{} wg *sync.WaitGroup ticker *time.Ticker // metric hash string + time sec int64 -> Metric object metricMap map[string]*aggregationDatum aggregationChan chan *aggregationDatum } func newDurationAggregator(durationInSeconds time.Duration, metricChan chan<- *aggregationDatum, shutdownChan <-chan struct{}, wg *sync.WaitGroup) *durationAggregator { durationAgg := &durationAggregator{ aggregationDuration: durationInSeconds, metricChan: metricChan, shutdownChan: shutdownChan, wg: wg, metricMap: make(map[string]*aggregationDatum), aggregationChan: make(chan *aggregationDatum, durationAggregationChanBufferSize), } go durationAgg.aggregating() return durationAgg } func (durationAgg *durationAggregator) aggregating() { durationAgg.wg.Add(1) // Sleep to align the interval to the wall clock. // This initial sleep is not interrupted if the aggregator gets shutdown. now := time.Now() time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now)) durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration) defer durationAgg.ticker.Stop() for { // There is no priority to select{}. // If there is a new metric AND the shutdownChan is closed when this // loop begins, then the behavior is random. select { case m := <-durationAgg.aggregationChan: if m == nil || m.Timestamp == nil || m.MetricName == nil || m.Unit == nil { log.Printf("E! cannot aggregate nil or partial datum") continue } // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html aggregatedTime := m.Timestamp.Truncate(durationAgg.aggregationDuration) metricMapKey := getAggregationKey(m, aggregatedTime.Unix()) aggregatedMetric, ok := durationAgg.metricMap[metricMapKey] if !ok { // First entry. Initialize it. durationAgg.metricMap[metricMapKey] = m if m.distribution == nil { // Assume function pointer is always valid. m.distribution = distribution.NewDistribution() err := m.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit) if err != nil { if errors.Is(err, distribution.ErrUnsupportedValue) { log.Printf("W! err %s, metric %s", err, *m.MetricName) } else { log.Printf("D! err %s, metric %s", err, *m.MetricName) } } } // Else the first entry has a distribution, so do nothing. } else { // Update an existing entry. if m.distribution == nil { err := aggregatedMetric.distribution.AddEntryWithUnit(*m.Value, 1, *m.Unit) if err != nil { log.Printf("W! err %s, metric %s", err, *m.MetricName) } } else { aggregatedMetric.distribution.AddDistribution(m.distribution) } } case <-durationAgg.ticker.C: durationAgg.flush() case <-durationAgg.shutdownChan: log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, do the final flush now for aggregation interval %v", durationAgg.aggregationDuration) durationAgg.flush() log.Printf("D! CloudWatch: aggregating routine receives the shutdown signal, exiting.") durationAgg.wg.Done() return } } } func (durationAgg *durationAggregator) addMetric(m *aggregationDatum) { durationAgg.aggregationChan <- m } func (durationAgg *durationAggregator) flush() { for _, v := range durationAgg.metricMap { durationAgg.metricChan <- v } durationAgg.metricMap = make(map[string]*aggregationDatum) }