in plugins/outputs/cloudwatchlogs/pusher.go [210:306]
func (p *pusher) send() {
defer p.resetFlushTimer() // Reset the flush timer after sending the request
if p.needSort {
sort.Stable(ByTimestamp(p.events))
}
input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: p.events,
LogGroupName: &p.Group,
LogStreamName: &p.Stream,
SequenceToken: p.sequenceToken,
}
startTime := time.Now()
retryCount := 0
for {
input.SequenceToken = p.sequenceToken
output, err := p.Service.PutLogEvents(input)
if err == nil {
if output.NextSequenceToken != nil {
p.sequenceToken = output.NextSequenceToken
}
if output.RejectedLogEventsInfo != nil {
info := output.RejectedLogEventsInfo
if info.TooOldLogEventEndIndex != nil {
p.Log.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, p.Group, p.Stream)
}
if info.TooNewLogEventStartIndex != nil {
p.Log.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, p.Group, p.Stream)
}
if info.ExpiredLogEventEndIndex != nil {
p.Log.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, p.Group, p.Stream)
}
}
for i := len(p.doneCallbacks) - 1; i >= 0; i-- {
done := p.doneCallbacks[i]
done()
}
p.Log.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(p.events), p.Group, p.Stream, p.bufferredSize/1024, time.Since(startTime))
p.addStats("rawSize", float64(p.bufferredSize))
p.reset()
p.lastSentTime = time.Now()
return
}
awsErr, ok := err.(awserr.Error)
if !ok {
p.Log.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", p.Group, p.Stream, err)
// Messages will be discarded but done callbacks not called
p.reset()
return
}
switch e := awsErr.(type) {
case *cloudwatchlogs.ResourceNotFoundException:
err := p.createLogGroupAndStream()
if err != nil {
p.Log.Errorf("Unable to create log stream %v/%v: %v", p.Group, p.Stream, e.Message())
break
}
p.putRetentionPolicy()
case *cloudwatchlogs.InvalidSequenceTokenException:
if p.sequenceToken == nil {
p.Log.Infof("First time sending logs to %v/%v since startup so sequenceToken is nil, learned new token:(%v): %v", p.Group, p.Stream, e.ExpectedSequenceToken, e.Message())
} else {
p.Log.Warnf("Invalid SequenceToken used (%v) while sending logs to %v/%v, will use new token and retry: %v", p.sequenceToken, p.Group, p.Stream, e.Message())
}
if e.ExpectedSequenceToken == nil {
p.Log.Errorf("Failed to find sequence token from aws response while sending logs to %v/%v: %v", p.Group, p.Stream, e.Message())
}
p.sequenceToken = e.ExpectedSequenceToken
case *cloudwatchlogs.InvalidParameterException,
*cloudwatchlogs.DataAlreadyAcceptedException:
p.Log.Errorf("%v, will not retry the request", e)
p.reset()
return
default:
p.Log.Errorf("Aws error received when sending logs to %v/%v: %v", p.Group, p.Stream, awsErr)
}
wait := retryWait(retryCount)
if time.Since(startTime)+wait > p.RetryDuration {
p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream)
p.reset()
return
}
p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait)
time.Sleep(wait)
retryCount++
}
}