in cloudwatch/cloudwatch.go [337:413]
func (output *OutputPlugin) AddEvent(e *Event) int {
// Step 1: convert the Event data to strings, and check for a log key.
data, err := output.processRecord(e)
if err != nil {
logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
// discard this single bad record and let the batch continue
return fluentbit.FLB_OK
}
// Step 2. Make sure the Event data isn't empty.
eventString := logString(data)
if len(eventString) == 0 {
logrus.Debugf("[cloudwatch %d] Discarding an event from publishing as it is empty\n", output.PluginInstanceID)
// discard this single empty record and let the batch continue
return fluentbit.FLB_OK
}
// Step 3. Extract the Task Metadata if applicable.
if output.runningInECS && output.ecsMetadata.TaskID == "" {
err := output.getECSMetadata()
if err != nil {
logrus.Errorf("[cloudwatch %d] Failed to get ECS Task Metadata with error: %v\n", output.PluginInstanceID, err)
return fluentbit.FLB_RETRY
}
}
// Step 4. Assign a log group and log stream name to the Event.
output.setGroupStreamNames(e)
// Step 5. Create a missing log group for this Event.
if _, ok := output.groups[e.group]; !ok {
logrus.Debugf("[cloudwatch %d] Finding log group: %s", output.PluginInstanceID, e.group)
if err := output.createLogGroup(e); err != nil {
logrus.Error(err)
return fluentbit.FLB_ERROR
}
output.groups[e.group] = struct{}{}
}
// Step 6. Create or retrieve an existing log stream for this Event.
stream, err := output.getLogStream(e)
if err != nil {
logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
// an error means that the log stream was not created; this is retryable
return fluentbit.FLB_RETRY
}
// Step 7. Check batch limits and flush buffer if any of these limits will be exeeded by this log Entry.
countLimit := len(stream.logEvents) == maximumLogEventsPerPut
sizeLimit := (stream.currentByteLength + cloudwatchLen(eventString)) >= maximumBytesPerPut
spanLimit := stream.logBatchSpan(e.TS) >= maximumTimeSpanPerPut
if countLimit || sizeLimit || spanLimit {
err = output.putLogEvents(stream)
if err != nil {
logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
// send failures are retryable
return fluentbit.FLB_RETRY
}
}
// Step 8. Add this event to the running tally.
stream.logEvents = append(stream.logEvents, &cloudwatchlogs.InputLogEvent{
Message: aws.String(eventString),
Timestamp: aws.Int64(e.TS.UnixNano() / 1e6), // CloudWatch uses milliseconds since epoch
})
stream.currentByteLength += cloudwatchLen(eventString)
if stream.currentBatchStart == nil || stream.currentBatchStart.After(e.TS) {
stream.currentBatchStart = &e.TS
}
if stream.currentBatchEnd == nil || stream.currentBatchEnd.Before(e.TS) {
stream.currentBatchEnd = &e.TS
}
return fluentbit.FLB_OK
}