plugins/outputs/cloudwatch/cloudwatch.go (516 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package cloudwatch
import (
"context"
"log"
"reflect"
"sort"
"sync"
"time"
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"golang.org/x/exp/maps"
configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/handlers"
"github.com/aws/amazon-cloudwatch-agent/internal/publisher"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/internal/util/collections"
"github.com/aws/amazon-cloudwatch-agent/metric/distribution"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatch"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatch/cloudwatchiface"
)
const (
defaultMaxDatumsPerCall = 1000 // PutMetricData only supports up to 1000 data metrics per call by default
defaultMaxValuesPerDatum = 150 // By default only these number of values can be inserted into the value list
bottomLinePayloadSizeInBytesToPublish = 999000 // 1MB payload size. Leave 1kb for the last datum buffer before applying compression ratio.
metricChanBufferSize = 10000
datumBatchChanBufferSize = 50 // the number of requests we buffer
maxConcurrentPublisher = 10 // the number of CloudWatch clients send request concurrently
defaultForceFlushInterval = time.Minute
highResolutionTagKey = "aws:StorageResolution"
defaultRetryCount = 5 // this is the retry count, the total attempts would be retry count + 1 at most.
backoffRetryBase = 200 * time.Millisecond
MaxDimensions = 30
)
const (
opPutLogEvents = "PutLogEvents"
opPutMetricData = "PutMetricData"
)
type CloudWatch struct {
config *Config
logger *zap.Logger
svc cloudwatchiface.CloudWatchAPI
// todo: may want to increase the size of the chan since the type changed.
// 1 telegraf Metric could have many Fields.
// Each field corresponds to a MetricDatum.
metricChan chan *aggregationDatum
datumBatchChan chan map[string][]*cloudwatch.MetricDatum
metricDatumBatch *MetricDatumBatch
shutdownChan chan struct{}
retries int
publisher *publisher.Publisher
retryer *retryer.LogThrottleRetryer
droppingOriginMetrics collections.Set[string]
aggregator Aggregator
aggregatorShutdownChan chan struct{}
aggregatorWaitGroup sync.WaitGroup
lastRequestBytes int
}
// Compile time interface check.
var _ exporter.Metrics = (*CloudWatch)(nil)
func (c *CloudWatch) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (c *CloudWatch) Start(_ context.Context, host component.Host) error {
c.publisher, _ = publisher.NewPublisher(
publisher.NewNonBlockingFifoQueue(metricChanBufferSize),
maxConcurrentPublisher,
2*time.Second,
c.WriteToCloudWatch)
credentialConfig := &configaws.CredentialConfig{
Region: c.config.Region,
AccessKey: c.config.AccessKey,
SecretKey: c.config.SecretKey,
RoleARN: c.config.RoleARN,
Profile: c.config.Profile,
Filename: c.config.SharedCredentialFilename,
Token: c.config.Token,
}
configProvider := credentialConfig.Credentials()
logger := models.NewLogger("outputs", "cloudwatch", "")
logThrottleRetryer := retryer.NewLogThrottleRetryer(logger)
svc := cloudwatch.New(
configProvider,
&aws.Config{
Endpoint: aws.String(c.config.EndpointOverride),
Retryer: logThrottleRetryer,
LogLevel: configaws.SDKLogLevel(),
Logger: configaws.SDKLogger{},
})
svc.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{opPutLogEvents, opPutMetricData}))
if c.config.MiddlewareID != nil {
awsmiddleware.TryConfigure(c.logger, host, *c.config.MiddlewareID, awsmiddleware.SDKv1(&svc.Handlers))
}
//Format unique roll up list
c.config.RollupDimensions = GetUniqueRollupList(c.config.RollupDimensions)
c.svc = svc
c.retryer = logThrottleRetryer
c.startRoutines()
return nil
}
func (c *CloudWatch) startRoutines() {
setNewDistributionFunc(c.config.MaxValuesPerDatum)
c.metricChan = make(chan *aggregationDatum, metricChanBufferSize)
c.datumBatchChan = make(chan map[string][]*cloudwatch.MetricDatum, datumBatchChanBufferSize)
c.shutdownChan = make(chan struct{})
c.aggregatorShutdownChan = make(chan struct{})
c.aggregator = NewAggregator(c.metricChan, c.aggregatorShutdownChan, &c.aggregatorWaitGroup)
perRequestConstSize := overallConstPerRequestSize + len(c.config.Namespace) + namespaceOverheads
c.metricDatumBatch = newMetricDatumBatch(c.config.MaxDatumsPerCall, perRequestConstSize)
go c.pushMetricDatum()
go c.publish()
}
func (c *CloudWatch) Shutdown(ctx context.Context) error {
log.Println("D! Stopping the CloudWatch output plugin")
for i := 0; i < 5; i++ {
if len(c.metricChan) == 0 && len(c.datumBatchChan) == 0 {
break
} else {
log.Printf("D! CloudWatch Close, %vth time to sleep since there is still some metric data remaining to publish.", i)
time.Sleep(time.Second)
}
}
if metricChanLen, datumBatchChanLen := len(c.metricChan), len(c.datumBatchChan); metricChanLen != 0 || datumBatchChanLen != 0 {
log.Printf("D! CloudWatch Close, metricChan length = %v, datumBatchChan length = %v.", metricChanLen, datumBatchChanLen)
}
close(c.shutdownChan)
c.publisher.Close()
c.retryer.Stop()
log.Println("D! Stopped the CloudWatch output plugin")
return nil
}
// ConsumeMetrics queues metrics to be published to CW.
// The actual publishing will occur in a long running goroutine.
// This method can block when publishing is backed up.
func (c *CloudWatch) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
datums := ConvertOtelMetrics(metrics)
for _, d := range datums {
c.aggregator.AddMetric(d)
}
return nil
}
// pushMetricDatum groups datums into batches for efficient API calls.
// When a batch is full it is queued up for sending.
// Even if the batch is not full it will still get sent after the flush interval.
func (c *CloudWatch) pushMetricDatum() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case metric := <-c.metricChan:
entity, datums := c.BuildMetricDatum(metric)
numberOfPartitions := len(datums)
/* We currently do not account for entity information as a part of the payload size.
This is by design and should be revisited once the SDK protocol changes.
In the meantime there has been a payload limit increase applied in the background to accommodate this decision
Otherwise to include entity size you would do something like this:
c.metricDatumBatch.Size += calculateEntitySize(entity)
In addition to calculating the size of the entity object, you might also need to account for any extra bytes that get
added on an individual metric level when entity data is present (depends on how the sdk protocol changes)—something like:
c.metricDatumBatch.Size += payload(datums[i], entityPresent=true)
File diff that could be useful: https://github.com/aws/amazon-cloudwatch-agent/compare/af960d7...459ef7c
*/
for i := 0; i < numberOfPartitions; i++ {
entityStr := entityToString(entity)
c.metricDatumBatch.Partition[entityStr] = append(c.metricDatumBatch.Partition[entityStr], datums[i])
c.metricDatumBatch.Size += payload(datums[i])
c.metricDatumBatch.Count++
if c.metricDatumBatch.isFull() {
// if batch is full
c.datumBatchChan <- c.metricDatumBatch.Partition
c.metricDatumBatch.clear()
}
}
case <-ticker.C:
if c.timeToPublish(c.metricDatumBatch) {
// if the time to publish comes
c.lastRequestBytes = c.metricDatumBatch.Size
c.datumBatchChan <- c.metricDatumBatch.Partition
c.metricDatumBatch.clear()
}
case <-c.shutdownChan:
return
}
}
}
type MetricDatumBatch struct {
MaxDatumsPerCall int
Partition map[string][]*cloudwatch.MetricDatum
BeginTime time.Time
Size int
Count int
perRequestConstSize int
}
func newMetricDatumBatch(maxDatumsPerCall, perRequestConstSize int) *MetricDatumBatch {
return &MetricDatumBatch{
MaxDatumsPerCall: maxDatumsPerCall,
Partition: map[string][]*cloudwatch.MetricDatum{},
BeginTime: time.Now(),
Size: perRequestConstSize,
Count: 0,
perRequestConstSize: perRequestConstSize,
}
}
func (b *MetricDatumBatch) clear() {
b.Partition = map[string][]*cloudwatch.MetricDatum{}
b.BeginTime = time.Now()
b.Size = b.perRequestConstSize
b.Count = 0
}
func (b *MetricDatumBatch) isFull() bool {
return b.Count >= b.MaxDatumsPerCall || b.Size >= bottomLinePayloadSizeInBytesToPublish
}
func (c *CloudWatch) timeToPublish(b *MetricDatumBatch) bool {
return len(b.Partition) > 0 && time.Since(b.BeginTime) >= c.config.ForceFlushInterval
}
// getFirstPushMs returns the time at which the first upload should occur.
// It uses random jitter as an offset from the start of the given interval.
func getFirstPushMs(interval time.Duration) int64 {
publishJitter := publishJitter(interval)
log.Printf("I! cloudwatch: publish with ForceFlushInterval: %v, Publish Jitter: %v",
interval, publishJitter)
nowMs := time.Now().UnixMilli()
// Truncate i.e. round down, then add jitter.
// If the rounded down time is in the past, move it forward.
nextMs := nowMs - (nowMs % interval.Milliseconds()) + publishJitter.Milliseconds()
if nextMs < nowMs {
nextMs += interval.Milliseconds()
}
return nextMs
}
// publish loops until a shutdown occurs.
// It periodically tries pushing batches of metrics (if there are any).
// If the batch buffer fills up the interval will be gradually reduced to avoid
// many agents bursting the backend.
func (c *CloudWatch) publish() {
currentInterval := c.config.ForceFlushInterval
nextMs := getFirstPushMs(currentInterval)
bufferFullOccurred := false
for {
shouldPublish := false
select {
case <-c.shutdownChan:
log.Printf("D! cloudwatch: publish routine receives the shutdown signal, exiting.")
return
default:
}
nowMs := time.Now().UnixMilli()
if c.metricDatumBatchFull() {
if !bufferFullOccurred {
// Set to true so this only happens once per push.
bufferFullOccurred = true
// Keep interval above 1 second.
if currentInterval.Seconds() > 1 {
currentInterval /= 2
if currentInterval.Seconds() < 1 {
currentInterval = 1 * time.Second
}
// Cut the remaining interval in half.
nextMs = nowMs + ((nextMs - nowMs) / 2)
}
}
}
if nowMs >= nextMs {
shouldPublish = true
// Restore interval if buffer did not fill up during this interval.
if !bufferFullOccurred {
currentInterval = c.config.ForceFlushInterval
}
nextMs += currentInterval.Milliseconds()
}
if shouldPublish {
c.pushMetricDatumBatch()
bufferFullOccurred = false
}
// Sleep 1 second, unless the nextMs is less than a second away.
if nextMs-nowMs > time.Second.Milliseconds() {
time.Sleep(time.Second)
} else {
time.Sleep(time.Duration(nextMs-nowMs) * time.Millisecond)
}
}
}
// metricDatumBatchFull returns true if the channel/buffer of batches if full.
func (c *CloudWatch) metricDatumBatchFull() bool {
return len(c.datumBatchChan) >= datumBatchChanBufferSize
}
// pushMetricDatumBatch will try receiving on the channel, and if successful,
// then it publishes the received batch.
func (c *CloudWatch) pushMetricDatumBatch() {
for {
select {
case datumBatch := <-c.datumBatchChan:
c.publisher.Publish(datumBatch)
continue
default:
}
break
}
}
// backoffSleep sleeps some amount of time based on number of retries done.
func (c *CloudWatch) backoffSleep() {
d := 1 * time.Minute
if c.retries <= defaultRetryCount {
d = backoffRetryBase * time.Duration(1<<c.retries)
}
d = (d / 2) + publishJitter(d/2)
log.Printf("W! cloudwatch: %v retries, going to sleep %v ms before retrying.",
c.retries, d.Milliseconds())
c.retries++
time.Sleep(d)
}
func createEntityMetricData(entityToMetrics map[string][]*cloudwatch.MetricDatum) []*cloudwatch.EntityMetricData {
var entityMetricData []*cloudwatch.EntityMetricData
for entityStr, metrics := range entityToMetrics {
if entityStr == "" {
continue
}
entity := stringToEntity(entityStr)
entityMetricData = append(entityMetricData, &cloudwatch.EntityMetricData{
Entity: &entity,
MetricData: metrics,
})
}
return entityMetricData
}
func (c *CloudWatch) WriteToCloudWatch(req interface{}) {
entityToMetricDatum := req.(map[string][]*cloudwatch.MetricDatum)
// PMD requires PutMetricData to have MetricData
metricData := entityToMetricDatum[""]
if _, ok := entityToMetricDatum[""]; !ok {
metricData = []*cloudwatch.MetricDatum{}
}
params := &cloudwatch.PutMetricDataInput{
MetricData: metricData,
Namespace: aws.String(c.config.Namespace),
EntityMetricData: createEntityMetricData(entityToMetricDatum),
StrictEntityValidation: aws.Bool(false),
}
var err error
for i := 0; i < defaultRetryCount; i++ {
_, err = c.svc.PutMetricData(params)
if err != nil {
awsErr, ok := err.(awserr.Error)
if !ok {
log.Printf("E! cloudwatch: Cannot cast PutMetricData error %v into awserr.Error.", err)
c.backoffSleep()
continue
}
switch awsErr.Code() {
case cloudwatch.ErrCodeLimitExceededFault, cloudwatch.ErrCodeInternalServiceFault:
log.Printf("W! cloudwatch: PutMetricData, error: %s, message: %s",
awsErr.Code(),
awsErr.Message())
c.backoffSleep()
continue
default:
log.Printf("E! cloudwatch: code: %s, message: %s, original error: %+v", awsErr.Code(), awsErr.Message(), awsErr.OrigErr())
c.backoffSleep()
}
} else {
c.retries = 0
}
break
}
if err != nil {
log.Println("E! cloudwatch: WriteToCloudWatch failure, err: ", err)
}
}
// BuildMetricDatum may just return the datum as-is.
// Or it might expand it into many datums due to dimension aggregation.
// There may also be more datums due to resize() on a distribution.
func (c *CloudWatch) BuildMetricDatum(metric *aggregationDatum) (cloudwatch.Entity, []*cloudwatch.MetricDatum) {
var datums []*cloudwatch.MetricDatum
var distList []distribution.Distribution
if metric.distribution != nil {
if metric.distribution.Size() == 0 {
log.Printf("E! metric has a distribution with no entries, %s", *metric.MetricName)
return metric.entity, datums
}
if metric.distribution.Unit() != "" {
metric.SetUnit(metric.distribution.Unit())
}
distList = resize(metric.distribution, c.config.MaxValuesPerDatum)
}
dimensionsList := c.ProcessRollup(metric.Dimensions)
for index, dimensions := range dimensionsList {
//index == 0 means it's the original metrics, and if the metric name and dimension matches, skip creating
//metric datum
if index == 0 && c.IsDropping(*metric.MetricDatum.MetricName) {
continue
}
if len(distList) == 0 {
if metric.Value == nil {
log.Printf("D! metric (%s) has nil value, dropping it", *metric.MetricName)
continue
}
if !distribution.IsSupportedValue(*metric.Value, distribution.MinValue, distribution.MaxValue) {
log.Printf("E! metric (%s) has an unsupported value: %v, dropping it", *metric.MetricName, *metric.Value)
continue
}
// Not a distribution.
datum := &cloudwatch.MetricDatum{
MetricName: metric.MetricName,
Dimensions: dimensions,
Timestamp: metric.Timestamp,
Unit: metric.Unit,
StorageResolution: metric.StorageResolution,
Value: metric.Value,
}
datums = append(datums, datum)
} else {
for _, dist := range distList {
values, counts := dist.ValuesAndCounts()
s := cloudwatch.StatisticSet{}
s.SetMaximum(dist.Maximum())
s.SetMinimum(dist.Minimum())
s.SetSampleCount(dist.SampleCount())
s.SetSum(dist.Sum())
// Beware there may be many datums sharing pointers to the same
// strings for metric names, dimensions, etc.
// It is fine since at this point the values will not change.
datum := &cloudwatch.MetricDatum{
MetricName: metric.MetricName,
Dimensions: dimensions,
Timestamp: metric.Timestamp,
Unit: metric.Unit,
StorageResolution: metric.StorageResolution,
Values: aws.Float64Slice(values),
Counts: aws.Float64Slice(counts),
StatisticValues: &s,
}
datums = append(datums, datum)
}
}
}
return metric.entity, datums
}
func (c *CloudWatch) IsDropping(metricName string) bool {
// Check if any metrics are provided in drop_original_metrics
if len(c.config.DropOriginalConfigs) == 0 {
return false
}
if _, ok := c.config.DropOriginalConfigs[metricName]; ok {
return true
}
return false
}
// sortedTagKeys returns a sorted list of keys in the map.
// Necessary for comparing a metric-name and its dimensions to determine
// if 2 metrics are actually the same.
func sortedTagKeys(tagMap map[string]string) []string {
// Allocate slice with proper size and avoid append.
keys := make([]string, 0, len(tagMap))
for k := range tagMap {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// BuildDimensions converts the given map of strings to a list of dimensions.
// CloudWatch supports up to 30 dimensions per metric.
// So keep up to the first 30 alphabetically.
// This always includes the "host" tag if it exists.
// See https://github.com/aws/amazon-cloudwatch-agent/issues/398
func BuildDimensions(tagMap map[string]string) []*cloudwatch.Dimension {
if len(tagMap) > MaxDimensions {
log.Printf("D! cloudwatch: dropping dimensions, max %v, count %v",
MaxDimensions, len(tagMap))
}
dimensions := make([]*cloudwatch.Dimension, 0, MaxDimensions)
// This is pretty ugly but we always want to include the "host" tag if it exists.
if host, ok := tagMap["host"]; ok && host != "" {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: aws.String("host"),
Value: aws.String(host),
})
}
sortedKeys := sortedTagKeys(tagMap)
for _, k := range sortedKeys {
if len(dimensions) >= MaxDimensions {
break
}
if k == "host" {
continue
}
value := tagMap[k]
if value == "" {
continue
}
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: aws.String(k),
Value: aws.String(tagMap[k]),
})
}
return dimensions
}
// ProcessRollup creates the dimension sets based on the dimensions available in the original metric.
func (c *CloudWatch) ProcessRollup(rawDimensions []*cloudwatch.Dimension) [][]*cloudwatch.Dimension {
rawDimensionMap := map[string]string{}
for _, v := range rawDimensions {
rawDimensionMap[*v.Name] = *v.Value
}
targetDimensionsList := c.config.RollupDimensions
fullDimensionsList := [][]*cloudwatch.Dimension{rawDimensions}
for _, targetDimensions := range targetDimensionsList {
// skip if target dimensions count is same or more than the original metric.
// cannot have dimensions that do not exist in the original metric.
if len(targetDimensions) >= len(rawDimensions) {
continue
}
count := 0
extraDimensions := make([]*cloudwatch.Dimension, len(targetDimensions))
for _, targetDimensionKey := range targetDimensions {
if val, ok := rawDimensionMap[targetDimensionKey]; !ok {
break
} else {
extraDimensions[count] = &cloudwatch.Dimension{
Name: aws.String(targetDimensionKey),
Value: aws.String(val),
}
}
count++
}
if count == len(targetDimensions) {
fullDimensionsList = append(fullDimensionsList, extraDimensions)
}
}
return fullDimensionsList
}
// GetUniqueRollupList filters out duplicate dimensions within the sets and filters
// duplicate sets.
func GetUniqueRollupList(inputLists [][]string) [][]string {
var uniqueSets []collections.Set[string]
for _, inputList := range inputLists {
inputSet := collections.NewSet(inputList...)
count := 0
for _, uniqueSet := range uniqueSets {
if reflect.DeepEqual(inputSet, uniqueSet) {
break
}
count++
}
if count == len(uniqueSets) {
uniqueSets = append(uniqueSets, inputSet)
}
}
uniqueLists := make([][]string, len(uniqueSets))
for i, uniqueSet := range uniqueSets {
uniqueLists[i] = maps.Keys(uniqueSet)
sort.Strings(uniqueLists[i])
}
log.Printf("I! cloudwatch: get unique roll up list %v", uniqueLists)
return uniqueLists
}
func (c *CloudWatch) SampleConfig() string {
return ""
}
func (c *CloudWatch) Description() string {
return "Configuration for AWS CloudWatch output."
}
func (c *CloudWatch) Connect() error {
return nil
}
func (c *CloudWatch) Close() error {
return nil
}
func (c *CloudWatch) Write(metrics []telegraf.Metric) error {
return nil
}
func init() {
outputs.Add("cloudwatch", func() telegraf.Output {
return &CloudWatch{}
})
}