func()

in cloudwatch/cloudwatch.go [337:413]


func (output *OutputPlugin) AddEvent(e *Event) int {
	// Step 1: convert the Event data to strings, and check for a log key.
	data, err := output.processRecord(e)
	if err != nil {
		logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
		// discard this single bad record and let the batch continue
		return fluentbit.FLB_OK
	}

	// Step 2. Make sure the Event data isn't empty.
	eventString := logString(data)
	if len(eventString) == 0 {
		logrus.Debugf("[cloudwatch %d] Discarding an event from publishing as it is empty\n", output.PluginInstanceID)
		// discard this single empty record and let the batch continue
		return fluentbit.FLB_OK
	}

	// Step 3. Extract the Task Metadata if applicable.
	if output.runningInECS && output.ecsMetadata.TaskID == "" {
		err := output.getECSMetadata()
		if err != nil {
			logrus.Errorf("[cloudwatch %d] Failed to get ECS Task Metadata with error: %v\n", output.PluginInstanceID, err)
			return fluentbit.FLB_RETRY
		}
	}

	// Step 4. Assign a log group and log stream name to the Event.
	output.setGroupStreamNames(e)

	// Step 5. Create a missing log group for this Event.
	if _, ok := output.groups[e.group]; !ok {
		logrus.Debugf("[cloudwatch %d] Finding log group: %s", output.PluginInstanceID, e.group)

		if err := output.createLogGroup(e); err != nil {
			logrus.Error(err)
			return fluentbit.FLB_ERROR
		}

		output.groups[e.group] = struct{}{}
	}

	// Step 6. Create or retrieve an existing log stream for this Event.
	stream, err := output.getLogStream(e)
	if err != nil {
		logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
		// an error means that the log stream was not created; this is retryable
		return fluentbit.FLB_RETRY
	}

	// Step 7. Check batch limits and flush buffer if any of these limits will be exeeded by this log Entry.
	countLimit := len(stream.logEvents) == maximumLogEventsPerPut
	sizeLimit := (stream.currentByteLength + cloudwatchLen(eventString)) >= maximumBytesPerPut
	spanLimit := stream.logBatchSpan(e.TS) >= maximumTimeSpanPerPut
	if countLimit || sizeLimit || spanLimit {
		err = output.putLogEvents(stream)
		if err != nil {
			logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
			// send failures are retryable
			return fluentbit.FLB_RETRY
		}
	}

	// Step 8. Add this event to the running tally.
	stream.logEvents = append(stream.logEvents, &cloudwatchlogs.InputLogEvent{
		Message:   aws.String(eventString),
		Timestamp: aws.Int64(e.TS.UnixNano() / 1e6), // CloudWatch uses milliseconds since epoch
	})
	stream.currentByteLength += cloudwatchLen(eventString)
	if stream.currentBatchStart == nil || stream.currentBatchStart.After(e.TS) {
		stream.currentBatchStart = &e.TS
	}
	if stream.currentBatchEnd == nil || stream.currentBatchEnd.Before(e.TS) {
		stream.currentBatchEnd = &e.TS
	}

	return fluentbit.FLB_OK
}