func()

in agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsservice.go [616:714]


func (service *CloudWatchLogsService) getNextMessage(
	absoluteFilePath string,
	lastKnownLineUploadedToCWL *int64,
	currentLineNumber *int64,
	cleanupControlCharacters bool,
	structuredLogs bool) (allEvents []*cloudwatchlogs.InputLogEvent, eof bool) {
	log := service.context.Log()
	// Open file to read.
	file, err := os.Open(absoluteFilePath)
	if err != nil {
		log.Warnf("Error opening file: %v", err)
		if service.isFileComplete {
			// End log update process if file complete to avoid file issue causing infinite loop.
			eof = true
		}
		return
	}
	defer file.Close()

	var messageThreshold int

	if structuredLogs {
		messageThreshold = StreamMessageLengthThresholdInBytes
	} else {
		messageThreshold = MessageLengthThresholdInBytes
	}

	reader := bufio.NewReaderSize(file, messageThreshold)

	// Skip to the last uploaded line.
	if *lastKnownLineUploadedToCWL > 0 {
		var lastLine int64 = 0
		_, err := reader.ReadSlice(NewLineCharacter)
		for err == nil || err == bufio.ErrBufferFull {
			lastLine++
			if lastLine == *lastKnownLineUploadedToCWL {
				break
			}
			_, err = reader.ReadSlice(NewLineCharacter)
		}
		if err != nil && err != io.EOF && err != bufio.ErrBufferFull {
			log.Warnf("Error skipping to last uploaded Cloudwatch line: %v", err)
			return
		}
	}

	var message, line []byte
	for {
		// Scan the next set of lines to upload.
		line, err = reader.ReadSlice(NewLineCharacter)
		if err != nil && err != bufio.ErrBufferFull {
			// Breaking out of loop since nothing to upload
			if err != io.EOF || len(line) == 0 || !service.isFileComplete {
				break
			}
		}
		// Process message if needed before uploading to CW
		line = processMessage(log, line, cleanupControlCharacters)
		// Check if message length threshold for the event has reached.
		// If true, then construct event with existing message so that new line will get added to the next event.
		// If false, then continue to append new line to existing message.
		if (len(message) + len(line)) > messageThreshold {
			log.Tracef("Appending line to current Cloudwatch event message"+
				" exceeds length limit %v bytes. [Line: %v] [Length: %v]",
				messageThreshold, *currentLineNumber, len(message)+len(line))

			event := service.buildEventInfo(message, structuredLogs)

			log.Trace("Created CloudWatch event from current event message buffer")
			allEvents = append(allEvents, event)
			if len(allEvents) >= maxNumberOfEventsPerCall {
				return
			}

			log.Trace("Reset Cloudwatch event message buffer")
			message = nil
		}
		message = append(message, line...)
		*currentLineNumber++
	}

	if err != io.EOF && err != nil {
		log.Warnf("Error reading from Cloudwatch logs file:", err)
	}

	// Build event with the message read so far to be uploaded to CW
	if len(message) > 0 {
		event := service.buildEventInfo(message, structuredLogs)
		allEvents = append(allEvents, event)
		return
	}

	// This determines the end of session.
	if len(message) == 0 && (err == nil || err == io.EOF) && service.isFileComplete {
		eof = true
	}

	return
}