func()

in agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsservice.go [503:613]


func (service *CloudWatchLogsService) StreamData(
	logGroupName string,
	logStreamName string,
	absoluteFilePath string,
	isFileComplete bool,
	isLogStreamCreated bool,
	fileCompleteSignal chan bool,
	cleanupControlCharacters bool,
	structuredLogs bool) (success bool) {
	log := service.context.Log()
	log.Infof("Uploading logs at %s to CloudWatch", absoluteFilePath)
	defer func() {
		if r := recover(); r != nil {
			log.Errorf("CloudWatch service stream data panic: %v", r)
			log.Errorf("Stacktrace:\n%s", debug.Stack())
		}
	}()

	service.isFileComplete = isFileComplete
	go func() {
		service.isFileComplete = <-fileCompleteSignal
		log.Debugf("Received file complete signal %v", service.isFileComplete)
	}()

	// Keeps track of the last known line number that was successfully uploaded to CloudWatch.
	var lastKnownLineUploadedToCWL int64 = 0
	// Keeps track of the next line number upto which the logs will be uploaded to CloudWatch.
	var currentLineNumber int64 = 0
	var sequenceToken *string
	var err error

	IsLogStreamCreated := isLogStreamCreated
	IsFirstTimeLogging := true

	// Initialize timer and set upload frequency.
	ticker := time.NewTicker(UploadFrequency)
	defer ticker.Stop()

	for range ticker.C {
		// Get next message to be uploaded.
		events, eof := service.getNextMessage(
			absoluteFilePath,
			&lastKnownLineUploadedToCWL,
			&currentLineNumber,
			cleanupControlCharacters,
			structuredLogs)

		// Exit case determining that the file is complete and has been scanned till EOF.
		if eof {
			log.Info("Finished uploading events to CloudWatch")
			service.isUploadComplete = true
			success = true
			break
		}

		// If no new messages found then skip uploading.
		if len(events) == 0 {
			log.Trace("No events to upload to CloudWatch")
			continue
		}

		if IsFirstTimeLogging {
			log.Infof("Started CloudWatch upload")
			IsFirstTimeLogging = false
		}
		log.Tracef("Uploading message line %d to CloudWatch", currentLineNumber)

		if !IsLogStreamCreated {
			log.Info("Log stream creation started")
			// Terminate process if the log stream cannot be created
			if err := service.CreateLogStream(logGroupName, logStreamName); err != nil {
				log.Errorf("Error Creating Log Stream for CloudWatchLogs output: %v", err)
				currentLineNumber = lastKnownLineUploadedToCWL
				log.Debug("Failed to upload message to CloudWatch")
				break
			} else {
				log.Info("Log stream already created")
				IsLogStreamCreated = true
			}
			log.Info("Log stream creation ended")
		}

		// Use sequenceToken returned by PutLogEvents if present, else fetch new one
		if sequenceToken == nil {
			log.Info("Calling Get Sequence token")
			sequenceToken = service.GetSequenceTokenForStream(logGroupName, logStreamName)
			log.Info("Received Sequence token")
		}

		sequenceToken, err = service.PutLogEvents(events, logGroupName, logStreamName, sequenceToken)
		if err == nil {
			// Set the last known line to current since the upload was successful.
			lastKnownLineUploadedToCWL = currentLineNumber
			log.Tracef("Successfully uploaded message line %v to CloudWatch", currentLineNumber)
		} else {
			if errCode := sdkutil.GetAwsErrorCode(err); errCode == resourceNotFoundException {
				// Log group or log stream not found due to resource change outside of client. Stop log streaming for session
				log.Errorf(
					"Log group \"%s\" or log stream \"%s\" not found. Log stream stopped. Error:%v",
					logGroupName,
					logStreamName,
					err)
				break
			}
			// Upload failed for unknown reason. Reset the current line to last known line and retry upload again in the next iteration
			currentLineNumber = lastKnownLineUploadedToCWL
			log.Warnf("Failed to upload message to CloudWatch, err: %v", err)
		}
	}
	return success
}