func()

in cloudwatch/cloudwatch.go [725:798]


func (output *OutputPlugin) putLogEvents(stream *logStream) error {
	// return in case of empty logEvents
	if len(stream.logEvents) == 0 {
		return nil
	}

	output.timer.Check()
	stream.updateExpiration()

	// Log events in a single PutLogEvents request must be in chronological order.
	sort.SliceStable(stream.logEvents, func(i, j int) bool {
		return aws.Int64Value(stream.logEvents[i].Timestamp) < aws.Int64Value(stream.logEvents[j].Timestamp)
	})
	response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
		LogEvents:     stream.logEvents,
		LogGroupName:  aws.String(stream.logGroupName),
		LogStreamName: aws.String(stream.logStreamName),
		SequenceToken: stream.nextSequenceToken,
	})
	if err != nil {
		if awsErr, ok := err.(awserr.Error); ok {
			if awsErr.Code() == cloudwatchlogs.ErrCodeDataAlreadyAcceptedException {
				// already submitted, just grab the correct sequence token
				parts := strings.Split(awsErr.Message(), " ")
				stream.nextSequenceToken = &parts[len(parts)-1]
				stream.logEvents = stream.logEvents[:0]
				stream.currentByteLength = 0
				stream.currentBatchStart = nil
				stream.currentBatchEnd = nil
				logrus.Infof("[cloudwatch %d] Encountered error %v; data already accepted, ignoring error\n", output.PluginInstanceID, awsErr)
				return nil
			} else if awsErr.Code() == cloudwatchlogs.ErrCodeInvalidSequenceTokenException {
				// sequence code is bad, grab the correct one and retry
				parts := strings.Split(awsErr.Message(), " ")
				stream.nextSequenceToken = &parts[len(parts)-1]

				return output.putLogEvents(stream)
			} else if awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
				// a log group or a log stream should be re-created after it is deleted and then retry
				logrus.Errorf("[cloudwatch %d] Encountered error %v; detailed information: %s\n", output.PluginInstanceID, awsErr, awsErr.Message())
				if strings.Contains(awsErr.Message(), "group") {
					if err := output.createLogGroup(&Event{group: stream.logGroupName}); err != nil {
						logrus.Errorf("[cloudwatch %d] Encountered error %v\n", output.PluginInstanceID, err)
						return err
					}
				} else if strings.Contains(awsErr.Message(), "stream") {
					if _, err := output.createStream(&Event{group: stream.logGroupName, stream: stream.logStreamName}); err != nil {
						logrus.Errorf("[cloudwatch %d] Encountered error %v\n", output.PluginInstanceID, err)
						return err
					}
				}

				return fmt.Errorf("A Log group/stream did not exist, re-created it. Will retry PutLogEvents on next flush")
			} else {
				output.timer.Start()
				return err
			}
		} else {
			return err
		}
	}
	output.processRejectedEventsInfo(response)
	output.timer.Reset()
	logrus.Debugf("[cloudwatch %d] Sent %d events to CloudWatch for stream '%s' in group '%s'",
		output.PluginInstanceID, len(stream.logEvents), stream.logStreamName, stream.logGroupName)

	stream.nextSequenceToken = response.NextSequenceToken
	stream.logEvents = stream.logEvents[:0]
	stream.currentByteLength = 0
	stream.currentBatchStart = nil
	stream.currentBatchEnd = nil

	return nil
}