in cloudwatch/cloudwatch.go [725:798]
func (output *OutputPlugin) putLogEvents(stream *logStream) error {
// return in case of empty logEvents
if len(stream.logEvents) == 0 {
return nil
}
output.timer.Check()
stream.updateExpiration()
// Log events in a single PutLogEvents request must be in chronological order.
sort.SliceStable(stream.logEvents, func(i, j int) bool {
return aws.Int64Value(stream.logEvents[i].Timestamp) < aws.Int64Value(stream.logEvents[j].Timestamp)
})
response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogEvents: stream.logEvents,
LogGroupName: aws.String(stream.logGroupName),
LogStreamName: aws.String(stream.logStreamName),
SequenceToken: stream.nextSequenceToken,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == cloudwatchlogs.ErrCodeDataAlreadyAcceptedException {
// already submitted, just grab the correct sequence token
parts := strings.Split(awsErr.Message(), " ")
stream.nextSequenceToken = &parts[len(parts)-1]
stream.logEvents = stream.logEvents[:0]
stream.currentByteLength = 0
stream.currentBatchStart = nil
stream.currentBatchEnd = nil
logrus.Infof("[cloudwatch %d] Encountered error %v; data already accepted, ignoring error\n", output.PluginInstanceID, awsErr)
return nil
} else if awsErr.Code() == cloudwatchlogs.ErrCodeInvalidSequenceTokenException {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
stream.nextSequenceToken = &parts[len(parts)-1]
return output.putLogEvents(stream)
} else if awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
// a log group or a log stream should be re-created after it is deleted and then retry
logrus.Errorf("[cloudwatch %d] Encountered error %v; detailed information: %s\n", output.PluginInstanceID, awsErr, awsErr.Message())
if strings.Contains(awsErr.Message(), "group") {
if err := output.createLogGroup(&Event{group: stream.logGroupName}); err != nil {
logrus.Errorf("[cloudwatch %d] Encountered error %v\n", output.PluginInstanceID, err)
return err
}
} else if strings.Contains(awsErr.Message(), "stream") {
if _, err := output.createStream(&Event{group: stream.logGroupName, stream: stream.logStreamName}); err != nil {
logrus.Errorf("[cloudwatch %d] Encountered error %v\n", output.PluginInstanceID, err)
return err
}
}
return fmt.Errorf("A Log group/stream did not exist, re-created it. Will retry PutLogEvents on next flush")
} else {
output.timer.Start()
return err
}
} else {
return err
}
}
output.processRejectedEventsInfo(response)
output.timer.Reset()
logrus.Debugf("[cloudwatch %d] Sent %d events to CloudWatch for stream '%s' in group '%s'",
output.PluginInstanceID, len(stream.logEvents), stream.logStreamName, stream.logGroupName)
stream.nextSequenceToken = response.NextSequenceToken
stream.logEvents = stream.logEvents[:0]
stream.currentByteLength = 0
stream.currentBatchStart = nil
stream.currentBatchEnd = nil
return nil
}