in plugins/outputs/cloudwatch_logs/cloudwatch_logs.go [250:422]
func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
minTime := time.Now()
if c.lg.RetentionInDays != nil {
minTime = minTime.Add(-time.Hour * 24 * time.Duration(*c.lg.RetentionInDays))
} else {
minTime = minTime.Add(-maxPastLogEventTimeOffset)
}
maxTime := time.Now().Add(maxFutureLogEventTimeOffset)
for _, m := range metrics {
//Filtering metrics
if m.Name() != c.LDMetricName {
continue
}
if m.Time().After(maxTime) || m.Time().Before(minTime) {
c.Log.Debugf("Processing metric '%v': Metric is filtered based on TS!", m)
continue
}
tags := m.Tags()
fields := m.Fields()
logStream := ""
logData := ""
lsContainer := &logStreamContainer{
currentBatchSizeBytes: 0,
currentBatchIndex: 0,
messageBatches: []messageBatch{{}}}
switch c.lsKey {
case "tag":
logStream = tags[c.lsSource]
case "field":
if fields[c.lsSource] != nil {
logStream = fields[c.lsSource].(string)
}
default:
logStream = c.lsSource
}
if logStream == "" {
c.Log.Errorf("Processing metric '%v': log stream: key %q, source %q, not found!", m, c.lsKey, c.lsSource)
continue
}
switch c.logDatKey {
case "tag":
logData = tags[c.logDataSource]
case "field":
if fields[c.logDataSource] != nil {
logData = fields[c.logDataSource].(string)
}
}
if logData == "" {
c.Log.Errorf("Processing metric '%v': log data: key %q, source %q, not found!", m, c.logDatKey, c.logDataSource)
continue
}
//Check if message size is not fit to batch
if len(logData) > maxLogMessageLength {
metricStr := fmt.Sprintf("%v", m)
c.Log.Errorf("Processing metric '%s...', message is too large to fit to aws max log message size: %d (bytes) !", metricStr[0:maxLogMessageLength/1000], maxLogMessageLength)
continue
}
//Batching log messages
//awsOverheadPerLogMessageBytes - is mandatory aws overhead per each log message
messageSizeInBytesForAWS := len(logData) + awsOverheadPerLogMessageBytes
//Pick up existing or prepare new log stream container.
//Log stream container stores logs per log stream in
//the AWS Cloudwatch logs API friendly structure
if val, ok := c.ls[logStream]; ok {
lsContainer = val
} else {
lsContainer.messageBatches[0].messageCount = 0
lsContainer.messageBatches[0].logEvents = []types.InputLogEvent{}
c.ls[logStream] = lsContainer
}
if lsContainer.currentBatchSizeBytes+messageSizeInBytesForAWS > maxBatchSizeBytes ||
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount >= maxItemsInBatch {
//Need to start new batch, and reset counters
lsContainer.currentBatchIndex++
lsContainer.messageBatches = append(lsContainer.messageBatches,
messageBatch{
logEvents: []types.InputLogEvent{},
messageCount: 0})
lsContainer.currentBatchSizeBytes = messageSizeInBytesForAWS
} else {
lsContainer.currentBatchSizeBytes += messageSizeInBytesForAWS
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount++
}
//AWS need time in milliseconds. time.UnixNano() returns time in nanoseconds since epoch
//we store here TS with nanosec precision iun order to have proper ordering, later ts will be reduced to milliseconds
metricTime := m.Time().UnixNano()
//Adding metring to batch
lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents =
append(lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents,
types.InputLogEvent{
Message: &logData,
Timestamp: &metricTime})
}
// Sorting out log events by TS and sending them to cloud watch logs
for logStream, elem := range c.ls {
for index, batch := range elem.messageBatches {
if len(batch.logEvents) == 0 { //can't push empty batch
//c.Log.Warnf("Empty batch detected, skipping...")
continue
}
//Sorting
sort.Slice(batch.logEvents[:], func(i, j int) bool {
return *batch.logEvents[i].Timestamp < *batch.logEvents[j].Timestamp
})
putLogEvents := cloudwatchlogs.PutLogEventsInput{LogGroupName: &c.LogGroup, LogStreamName: &logStream}
if elem.sequenceToken == "" {
//This is the first attempt to write to log stream,
//need to check log stream existence and create it if necessary
describeLogStreamOutput, err := c.svc.DescribeLogStreams(context.Background(), &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: &c.LogGroup,
LogStreamNamePrefix: &logStream})
if err == nil && len(describeLogStreamOutput.LogStreams) == 0 {
_, err := c.svc.CreateLogStream(context.Background(), &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: &c.LogGroup,
LogStreamName: &logStream})
if err != nil {
c.Log.Errorf("Can't create log stream %q in log group. Reason: %v %q.", logStream, c.LogGroup, err)
continue
}
putLogEvents.SequenceToken = nil
} else if err == nil && len(describeLogStreamOutput.LogStreams) == 1 {
putLogEvents.SequenceToken = describeLogStreamOutput.LogStreams[0].UploadSequenceToken
} else if err == nil && len(describeLogStreamOutput.LogStreams) > 1 { //Ambiguity
c.Log.Errorf("More than 1 log stream found with prefix %q in log group %q.", logStream, c.LogGroup)
continue
} else {
c.Log.Errorf("Error describing log streams in log group %q. Reason: %v", c.LogGroup, err)
continue
}
} else {
putLogEvents.SequenceToken = &elem.sequenceToken
}
//Upload log events
//Adjusting TS to be in align with cloudwatch logs requirements
for _, event := range batch.logEvents {
*event.Timestamp = *event.Timestamp / 1000000
}
putLogEvents.LogEvents = batch.logEvents
//There is a quota of 5 requests per second per log stream. Additional
//requests are throttled. This quota can't be changed.
putLogEventsOutput, err := c.svc.PutLogEvents(context.Background(), &putLogEvents)
if err != nil {
c.Log.Errorf("Can't push logs batch to AWS. Reason: %v", err)
continue
}
//Cleanup batch
elem.messageBatches[index] = messageBatch{
logEvents: []types.InputLogEvent{},
messageCount: 0}
elem.sequenceToken = *putLogEventsOutput.NextSequenceToken
}
}
return nil
}