plugins/outputs/cloudwatchlogs/internal/pusher/sender.go (116 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package pusher
import (
"errors"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/influxdata/telegraf"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
)
type cloudWatchLogsService interface {
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error)
DescribeLogGroups(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
}
type Sender interface {
Send(*logEventBatch)
SetRetryDuration(time.Duration)
RetryDuration() time.Duration
}
type sender struct {
service cloudWatchLogsService
retryDuration atomic.Value
targetManager TargetManager
logger telegraf.Logger
stop <-chan struct{}
}
func newSender(
logger telegraf.Logger,
service cloudWatchLogsService,
targetManager TargetManager,
retryDuration time.Duration,
stop <-chan struct{},
) Sender {
s := &sender{
logger: logger,
service: service,
targetManager: targetManager,
stop: stop,
}
s.retryDuration.Store(retryDuration)
return s
}
// Send attempts to send a batch of log events to CloudWatch Logs. Will retry failed attempts until it reaches the
// RetryDuration or an unretryable error.
func (s *sender) Send(batch *logEventBatch) {
if len(batch.events) == 0 {
return
}
input := batch.build()
startTime := time.Now()
retryCountShort := 0
retryCountLong := 0
for {
output, err := s.service.PutLogEvents(input)
if err == nil {
if output.RejectedLogEventsInfo != nil {
info := output.RejectedLogEventsInfo
if info.TooOldLogEventEndIndex != nil {
s.logger.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, batch.Group, batch.Stream)
}
if info.TooNewLogEventStartIndex != nil {
s.logger.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, batch.Group, batch.Stream)
}
if info.ExpiredLogEventEndIndex != nil {
s.logger.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, batch.Group, batch.Stream)
}
}
batch.done()
s.logger.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(batch.events), batch.Group, batch.Stream, batch.bufferedSize/1024, time.Since(startTime))
return
}
var awsErr awserr.Error
if !errors.As(err, &awsErr) {
s.logger.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", batch.Group, batch.Stream, err)
return
}
switch e := awsErr.(type) {
case *cloudwatchlogs.ResourceNotFoundException:
if targetErr := s.targetManager.InitTarget(batch.Target); targetErr != nil {
s.logger.Errorf("Unable to create log stream %v/%v: %v", batch.Group, batch.Stream, targetErr)
break
}
case *cloudwatchlogs.InvalidParameterException,
*cloudwatchlogs.DataAlreadyAcceptedException:
s.logger.Errorf("%v, will not retry the request", e)
return
default:
s.logger.Errorf("Aws error received when sending logs to %v/%v: %v", batch.Group, batch.Stream, awsErr)
}
// retry wait strategy depends on the type of error returned
var wait time.Duration
if chooseRetryWaitStrategy(err) == retryLong {
wait = retryWaitLong(retryCountLong)
retryCountLong++
} else {
wait = retryWaitShort(retryCountShort)
retryCountShort++
}
if time.Since(startTime)+wait > s.RetryDuration() {
s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
return
}
s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait)
select {
case <-s.stop:
s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
return
case <-time.After(wait):
}
}
}
// SetRetryDuration sets the maximum duration for retrying failed log sends.
func (s *sender) SetRetryDuration(retryDuration time.Duration) {
s.retryDuration.Store(retryDuration)
}
// RetryDuration returns the current maximum retry duration.
func (s *sender) RetryDuration() time.Duration {
return s.retryDuration.Load().(time.Duration)
}