plugins/outputs/cloudwatchlogs/internal/pusher/batch.go (110 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package pusher import ( "sort" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" ) // CloudWatch Logs PutLogEvents API limits // Taken from https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html const ( // The maximum batch size in bytes. This size is calculated as the sum of all event messages in UTF-8, // plus 26 bytes for each log event. reqSizeLimit = 1024 * 1024 // The maximum number of log events in a batch. reqEventsLimit = 10000 // The bytes required for metadata for each log event. perEventHeaderBytes = 200 // A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails. batchTimeRangeLimit = 24 * time.Hour ) // logEvent represents a single cloudwatchlogs.InputLogEvent with some metadata for processing type logEvent struct { timestamp time.Time message string eventBytes int doneCallback func() } func newLogEvent(timestamp time.Time, message string, doneCallback func()) *logEvent { return &logEvent{ message: message, timestamp: timestamp, eventBytes: len(message) + perEventHeaderBytes, doneCallback: doneCallback, } } // batch builds a cloudwatchlogs.InputLogEvent from the timestamp and message stored. Converts the timestamp to // milliseconds to match the PutLogEvents specifications. func (e *logEvent) build() *cloudwatchlogs.InputLogEvent { return &cloudwatchlogs.InputLogEvent{ Timestamp: aws.Int64(e.timestamp.UnixMilli()), Message: aws.String(e.message), } } type logEventBatch struct { Target events []*cloudwatchlogs.InputLogEvent entityProvider logs.LogEntityProvider // Total size of all events in the batch. bufferedSize int // Whether the events need to be sorted before being sent. needSort bool // Minimum and maximum timestamps in the batch. minT, maxT time.Time // Callbacks to execute when batch is successfully sent. doneCallbacks []func() } func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch { return &logEventBatch{ Target: target, events: make([]*cloudwatchlogs.InputLogEvent, 0), entityProvider: entityProvider, } } // inTimeRange checks if adding an event with the timestamp would keep the batch within the 24-hour limit. func (b *logEventBatch) inTimeRange(timestamp time.Time) bool { if b.minT.IsZero() || b.maxT.IsZero() { return true } return timestamp.Sub(b.minT) <= batchTimeRangeLimit && b.maxT.Sub(timestamp) <= batchTimeRangeLimit } // hasSpace checks if adding an event of the given size will exceed the space limits. func (b *logEventBatch) hasSpace(size int) bool { return len(b.events) < reqEventsLimit && b.bufferedSize+size <= reqSizeLimit } // append adds a log event to the batch. func (b *logEventBatch) append(e *logEvent) { event := e.build() if len(b.events) > 0 && *event.Timestamp < *b.events[len(b.events)-1].Timestamp { b.needSort = true } b.events = append(b.events, event) b.addDoneCallback(e.doneCallback) b.bufferedSize += e.eventBytes if b.minT.IsZero() || b.minT.After(e.timestamp) { b.minT = e.timestamp } if b.maxT.IsZero() || b.maxT.Before(e.timestamp) { b.maxT = e.timestamp } } // addDoneCallback adds the callback to the end of the registered callbacks. func (b *logEventBatch) addDoneCallback(callback func()) { if callback != nil { b.doneCallbacks = append(b.doneCallbacks, callback) } } // done runs all registered callbacks. func (b *logEventBatch) done() { for i := len(b.doneCallbacks) - 1; i >= 0; i-- { done := b.doneCallbacks[i] done() } } // build creates a cloudwatchlogs.PutLogEventsInput from the batch. The log events in the batch must be in // chronological order by their timestamp. func (b *logEventBatch) build() *cloudwatchlogs.PutLogEventsInput { if b.needSort { sort.Stable(byTimestamp(b.events)) } input := &cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String(b.Group), LogStreamName: aws.String(b.Stream), LogEvents: b.events, } if b.entityProvider != nil { input.Entity = b.entityProvider.Entity() } return input } type byTimestamp []*cloudwatchlogs.InputLogEvent func (t byTimestamp) Len() int { return len(t) } func (t byTimestamp) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t byTimestamp) Less(i, j int) bool { return *t[i].Timestamp < *t[j].Timestamp }