in agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsservice.go [503:613]
func (service *CloudWatchLogsService) StreamData(
logGroupName string,
logStreamName string,
absoluteFilePath string,
isFileComplete bool,
isLogStreamCreated bool,
fileCompleteSignal chan bool,
cleanupControlCharacters bool,
structuredLogs bool) (success bool) {
log := service.context.Log()
log.Infof("Uploading logs at %s to CloudWatch", absoluteFilePath)
defer func() {
if r := recover(); r != nil {
log.Errorf("CloudWatch service stream data panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
service.isFileComplete = isFileComplete
go func() {
service.isFileComplete = <-fileCompleteSignal
log.Debugf("Received file complete signal %v", service.isFileComplete)
}()
// Keeps track of the last known line number that was successfully uploaded to CloudWatch.
var lastKnownLineUploadedToCWL int64 = 0
// Keeps track of the next line number upto which the logs will be uploaded to CloudWatch.
var currentLineNumber int64 = 0
var sequenceToken *string
var err error
IsLogStreamCreated := isLogStreamCreated
IsFirstTimeLogging := true
// Initialize timer and set upload frequency.
ticker := time.NewTicker(UploadFrequency)
defer ticker.Stop()
for range ticker.C {
// Get next message to be uploaded.
events, eof := service.getNextMessage(
absoluteFilePath,
&lastKnownLineUploadedToCWL,
¤tLineNumber,
cleanupControlCharacters,
structuredLogs)
// Exit case determining that the file is complete and has been scanned till EOF.
if eof {
log.Info("Finished uploading events to CloudWatch")
service.isUploadComplete = true
success = true
break
}
// If no new messages found then skip uploading.
if len(events) == 0 {
log.Trace("No events to upload to CloudWatch")
continue
}
if IsFirstTimeLogging {
log.Infof("Started CloudWatch upload")
IsFirstTimeLogging = false
}
log.Tracef("Uploading message line %d to CloudWatch", currentLineNumber)
if !IsLogStreamCreated {
log.Info("Log stream creation started")
// Terminate process if the log stream cannot be created
if err := service.CreateLogStream(logGroupName, logStreamName); err != nil {
log.Errorf("Error Creating Log Stream for CloudWatchLogs output: %v", err)
currentLineNumber = lastKnownLineUploadedToCWL
log.Debug("Failed to upload message to CloudWatch")
break
} else {
log.Info("Log stream already created")
IsLogStreamCreated = true
}
log.Info("Log stream creation ended")
}
// Use sequenceToken returned by PutLogEvents if present, else fetch new one
if sequenceToken == nil {
log.Info("Calling Get Sequence token")
sequenceToken = service.GetSequenceTokenForStream(logGroupName, logStreamName)
log.Info("Received Sequence token")
}
sequenceToken, err = service.PutLogEvents(events, logGroupName, logStreamName, sequenceToken)
if err == nil {
// Set the last known line to current since the upload was successful.
lastKnownLineUploadedToCWL = currentLineNumber
log.Tracef("Successfully uploaded message line %v to CloudWatch", currentLineNumber)
} else {
if errCode := sdkutil.GetAwsErrorCode(err); errCode == resourceNotFoundException {
// Log group or log stream not found due to resource change outside of client. Stop log streaming for session
log.Errorf(
"Log group \"%s\" or log stream \"%s\" not found. Log stream stopped. Error:%v",
logGroupName,
logStreamName,
err)
break
}
// Upload failed for unknown reason. Reset the current line to last known line and retry upload again in the next iteration
currentLineNumber = lastKnownLineUploadedToCWL
log.Warnf("Failed to upload message to CloudWatch, err: %v", err)
}
}
return success
}