in agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsservice.go [616:714]
func (service *CloudWatchLogsService) getNextMessage(
absoluteFilePath string,
lastKnownLineUploadedToCWL *int64,
currentLineNumber *int64,
cleanupControlCharacters bool,
structuredLogs bool) (allEvents []*cloudwatchlogs.InputLogEvent, eof bool) {
log := service.context.Log()
// Open file to read.
file, err := os.Open(absoluteFilePath)
if err != nil {
log.Warnf("Error opening file: %v", err)
if service.isFileComplete {
// End log update process if file complete to avoid file issue causing infinite loop.
eof = true
}
return
}
defer file.Close()
var messageThreshold int
if structuredLogs {
messageThreshold = StreamMessageLengthThresholdInBytes
} else {
messageThreshold = MessageLengthThresholdInBytes
}
reader := bufio.NewReaderSize(file, messageThreshold)
// Skip to the last uploaded line.
if *lastKnownLineUploadedToCWL > 0 {
var lastLine int64 = 0
_, err := reader.ReadSlice(NewLineCharacter)
for err == nil || err == bufio.ErrBufferFull {
lastLine++
if lastLine == *lastKnownLineUploadedToCWL {
break
}
_, err = reader.ReadSlice(NewLineCharacter)
}
if err != nil && err != io.EOF && err != bufio.ErrBufferFull {
log.Warnf("Error skipping to last uploaded Cloudwatch line: %v", err)
return
}
}
var message, line []byte
for {
// Scan the next set of lines to upload.
line, err = reader.ReadSlice(NewLineCharacter)
if err != nil && err != bufio.ErrBufferFull {
// Breaking out of loop since nothing to upload
if err != io.EOF || len(line) == 0 || !service.isFileComplete {
break
}
}
// Process message if needed before uploading to CW
line = processMessage(log, line, cleanupControlCharacters)
// Check if message length threshold for the event has reached.
// If true, then construct event with existing message so that new line will get added to the next event.
// If false, then continue to append new line to existing message.
if (len(message) + len(line)) > messageThreshold {
log.Tracef("Appending line to current Cloudwatch event message"+
" exceeds length limit %v bytes. [Line: %v] [Length: %v]",
messageThreshold, *currentLineNumber, len(message)+len(line))
event := service.buildEventInfo(message, structuredLogs)
log.Trace("Created CloudWatch event from current event message buffer")
allEvents = append(allEvents, event)
if len(allEvents) >= maxNumberOfEventsPerCall {
return
}
log.Trace("Reset Cloudwatch event message buffer")
message = nil
}
message = append(message, line...)
*currentLineNumber++
}
if err != io.EOF && err != nil {
log.Warnf("Error reading from Cloudwatch logs file:", err)
}
// Build event with the message read so far to be uploaded to CW
if len(message) > 0 {
event := service.buildEventInfo(message, structuredLogs)
allEvents = append(allEvents, event)
return
}
// This determines the end of session.
if len(message) == 0 && (err == nil || err == io.EOF) && service.isFileComplete {
eof = true
}
return
}