agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsservice.go (535 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// cloudwatchlogspublisher is responsible for pulling logs from the log queue and publishing them to cloudwatch
package cloudwatchlogspublisher
import (
"bufio"
"encoding/json"
"errors"
"io"
"os"
"runtime/debug"
"strings"
"time"
"github.com/aws/amazon-ssm-agent/agent/agentlogstocloudwatch/cloudwatchlogspublisher/cloudwatchlogsinterface"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/agent/sdkutil"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/pborman/ansi"
)
const (
stopPolicyErrorThreshold = 10
stopPolicyName = "CloudWatchLogsService"
maxRetries = 5
UploadFrequency = 1 * time.Second
NewLineCharacter = '\n'
maxNumberOfEventsPerCall = 4
// Event size - https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
MessageLengthThresholdInBytes = 200 * 1000
// json.Marshal can inflate streamed event message by up to ~6 times the size of the message, depending on message contents.
// Threshold is reduced here to 1/6 size to account for this.
// https://go.dev/play/p/G3RalE_BUEL
StreamMessageLengthThresholdInBytes = 33000
)
// CloudWatchLogsService encapsulates the client and stop policy as a wrapper to call the cloudwatchlogs API
type CloudWatchLogsService struct {
context context.T
cloudWatchLogsClient cloudwatchlogsinterface.CloudWatchLogsClient
stopPolicy *sdkutil.StopPolicy
isFileComplete bool
isUploadComplete bool
CloudWatchMessage CloudWatchMessage
}
// CloudWatchMessage captures all the information that are published in an event for streaming logs
type CloudWatchMessage struct {
_ struct{} `type:"structure"`
EventVersion *string `json:"eventVersion"`
EventTime *string `json:"eventTime"`
AwsRegion *string `json:"awsRegion"`
Target *Target `json:"target"`
UserIdentity *UserIdentity `json:"userIdentity"`
RunAsUser *string `json:"runAsUser"`
SessionId *string `json:"sessionId"`
SessionData []*string `json:"sessionData"`
}
// UserIdentity represents iam arn of the requester
type UserIdentity struct {
_ struct{} `type:"structure"`
Arn *string `json:"arn"`
}
// Target represents id of the target
type Target struct {
_ struct{} `type:"structure"`
Id *string `json:"id"`
}
// createCloudWatchStopPolicy creates a new policy for cloudwatchlogs
func createCloudWatchStopPolicy() *sdkutil.StopPolicy {
return sdkutil.NewStopPolicy(stopPolicyName, stopPolicyErrorThreshold)
}
// createCloudWatchClient creates a client to call CloudWatchLogs APIs
func createCloudWatchClient(context context.T) cloudwatchlogsinterface.CloudWatchLogsClient {
config := sdkutil.AwsConfig(context, "logs")
return createCloudWatchClientWithConfig(context, config)
}
// createCloudWatchClientWithCredentials creates a client to call CloudWatchLogs APIs using credentials from the id and secret passed
func createCloudWatchClientWithCredentials(context context.T, id, secret string) cloudwatchlogsinterface.CloudWatchLogsClient {
config := sdkutil.AwsConfig(context, "logs").WithCredentials(credentials.NewStaticCredentials(id, secret, ""))
return createCloudWatchClientWithConfig(context, config)
}
// createCloudWatchClientWithConfig creates a client to call CloudWatchLogs APIs using the passed aws config
func createCloudWatchClientWithConfig(context context.T, config *aws.Config) cloudwatchlogsinterface.CloudWatchLogsClient {
//Adding the AWS SDK Retrier with Exponential Backoff
config = request.WithRetryer(config, client.DefaultRetryer{
NumMaxRetries: maxRetries,
})
appConfig := context.AppConfig()
sess := session.New(config)
sess.Handlers.Build.PushBack(request.MakeAddToUserAgentHandler(appConfig.Agent.Name, appConfig.Agent.Version))
return cloudwatchlogs.New(sess)
}
// NewCloudWatchLogsService Creates a new instance of the CloudWatchLogsService
func NewCloudWatchLogsService(context context.T) *CloudWatchLogsService {
cloudWatchLogsService := CloudWatchLogsService{
context: context,
cloudWatchLogsClient: createCloudWatchClient(context),
stopPolicy: createCloudWatchStopPolicy(),
isFileComplete: false,
isUploadComplete: false,
CloudWatchMessage: CloudWatchMessage{},
}
return &cloudWatchLogsService
}
// NewCloudWatchLogsServiceWithCredentials Creates a new instance of the CloudWatchLogsService using credentials from the Id and Secret passed
func NewCloudWatchLogsServiceWithCredentials(context context.T, id, secret string) *CloudWatchLogsService {
cloudWatchLogsService := CloudWatchLogsService{
context: context,
cloudWatchLogsClient: createCloudWatchClientWithCredentials(context, id, secret),
stopPolicy: createCloudWatchStopPolicy(),
isFileComplete: false,
isUploadComplete: false,
}
return &cloudWatchLogsService
}
// SetCloudWatchMessage initializes CloudWatchMessage
func (service *CloudWatchLogsService) SetCloudWatchMessage(
eventVersion string,
awsRegion string,
targetId string,
runAsUser string,
sessionId string,
sessionOwner string) {
service.CloudWatchMessage = CloudWatchMessage{
EventVersion: aws.String(eventVersion),
AwsRegion: aws.String(awsRegion),
Target: &Target{Id: aws.String(targetId)},
UserIdentity: &UserIdentity{Arn: aws.String(sessionOwner)},
RunAsUser: aws.String(runAsUser),
SessionId: aws.String(sessionId),
}
}
// CreateNewServiceIfUnHealthy checks service healthy and create new service if original is unhealthy
func (service *CloudWatchLogsService) CreateNewServiceIfUnHealthy() {
if service.stopPolicy == nil {
service.stopPolicy = createCloudWatchStopPolicy()
}
if !service.stopPolicy.IsHealthy() {
service.stopPolicy.ResetErrorCount()
service.cloudWatchLogsClient = createCloudWatchClient(service.context)
return
}
}
// CreateLogGroup calls the CreateLogGroup API to create a log group
func (service *CloudWatchLogsService) CreateLogGroup(logGroup string) (err error) {
log := service.context.Log()
service.CreateNewServiceIfUnHealthy()
//Creating the parameters for the API Call
params := &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(logGroup),
}
//Calling the API
if _, err = service.cloudWatchLogsClient.CreateLogGroup(params); err != nil {
// Cast err to awserr.Error to get the Code
errorCode := sdkutil.GetAwsErrorCode(err)
switch errorCode {
// Check for error code. Note that the AWS Retrier has already made retries for the 5xx Response Codes
case resourceAlreadyExistsException:
log.Debugf("log group /%v/ already exists", logGroup)
// 400 Error, occurs when the LogGroup already exists
// Ignoring the error
err = nil
default:
if logGroupPresent, _ := service.IsLogGroupPresent(logGroup); logGroupPresent {
log.Debugf("skipping error check as the log group /%v/ already exists", logGroup)
err = nil
} else {
// Other 400 Errors, 500 Errors even after retries. Log the error
log.Errorf("Error Calling CreateLogGroup:%v", err.Error())
// Handle the common AWS errors and update the stop policy accordingly
sdkutil.HandleAwsError(log, err, service.stopPolicy)
}
}
}
return
}
// CreateLogStream calls the CreateLogStream API to create log stream within the specified log group
func (service *CloudWatchLogsService) CreateLogStream(logGroup, logStream string) (err error) {
log := service.context.Log()
service.CreateNewServiceIfUnHealthy()
//Creating the parameters for the API Call
params := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
}
//Calling the API
if _, err = service.cloudWatchLogsClient.CreateLogStream(params); err != nil {
// Cast err to awserr.Error to get the Code
errorCode := sdkutil.GetAwsErrorCode(err)
switch errorCode {
// Check for error code. Note that the AWS Retrier has already made retries for the 5xx Response Codes
case resourceAlreadyExistsException:
log.Debugf("log stream /%v/ already exists in the log group /%v/", logStream, logGroup)
// 400 Error, occurs when the LogStream already exists
// Ignoring the error
err = nil
default:
if service.IsLogStreamPresent(logGroup, logStream) {
log.Debugf("skipping error check as the log stream /%v/ already exists in the log group /%v/", logStream, logGroup)
err = nil
} else {
// Other 400 Errors, 500 Errors even after retries. Log the error
log.Errorf("Error Calling CreateLogStream:%v", err.Error())
// Handle the common AWS errors and update the stop policy accordingly
sdkutil.HandleAwsError(log, err, service.stopPolicy)
}
}
}
return
}
// DescribeLogGroups calls the DescribeLogGroups API to get the details of log groups of account
func (service *CloudWatchLogsService) DescribeLogGroups(logGroupPrefix, nextToken string) (response *cloudwatchlogs.DescribeLogGroupsOutput, err error) {
log := service.context.Log()
service.CreateNewServiceIfUnHealthy()
// Creating the parameters for the API Call
params := &cloudwatchlogs.DescribeLogGroupsInput{}
if logGroupPrefix != "" {
params.LogGroupNamePrefix = aws.String(logGroupPrefix)
}
if nextToken != "" {
params.NextToken = aws.String(nextToken)
}
// Calling the API
if response, err = service.cloudWatchLogsClient.DescribeLogGroups(params); err != nil {
// Handle the common AWS errors and update the stop policy accordingly
sdkutil.HandleAwsError(log, err, service.stopPolicy)
// AWS Retrier has already made retries for the 5xx Response Codes. Logging and Returning the error
log.Errorf("Error Calling DescribeLogGroups:%v", err.Error())
return
}
// Pretty-print the response data.
log.Debugf("DescribeLogGroups Response:%v", response)
return
}
// DescribeLogStreams calls the DescribeLogStreams API to get the details of the log streams present
func (service *CloudWatchLogsService) DescribeLogStreams(logGroup, logStreamPrefix, nextToken string) (response *cloudwatchlogs.DescribeLogStreamsOutput, err error) {
log := service.context.Log()
service.CreateNewServiceIfUnHealthy()
// Creating the parameters for the API Call
params := &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(logGroup),
}
if logStreamPrefix != "" {
params.LogStreamNamePrefix = aws.String(logStreamPrefix)
}
if nextToken != "" {
params.NextToken = aws.String(nextToken)
}
// Calling the API
if response, err = service.cloudWatchLogsClient.DescribeLogStreams(params); err != nil {
// Handle the common AWS errors and update the stop policy accordingly
sdkutil.HandleAwsError(log, err, service.stopPolicy)
// AWS Retrier has already made retries for the 5xx Response Codes. Logging and Returning the error
log.Errorf("Error Calling DescribeLogStreams:%v", err.Error())
return
}
// Pretty-print the response data.
log.Debugf("DescribeLogStreams Response:%v", response)
return
}
// getLogGroupDetails Calls the DescribeLogGroups API to get the details of the loggroup specified. Returns nil if not found
func (service *CloudWatchLogsService) getLogGroupDetails(logGroup string) (logGroupDetails *cloudwatchlogs.LogGroup, err error) {
log := service.context.Log()
// Keeping the nextToken as empty in the beginning. Might get filled from response for subsequent calls
nextToken := ""
// The API implements paginations. The bool if true means more results are present and thus need to call the API again.
nextBatchPresent := true
// Continue calling the API until we find the group or next batch of groups is not present
for nextBatchPresent {
describeLogGroupsOutput, err := service.DescribeLogGroups(logGroup, nextToken)
if err != nil {
log.Errorf("Error in calling DescribeLogGroups:%v", err)
return nil, err
}
// Iterate through the log streams and check for the input log stream
for _, stream := range describeLogGroupsOutput.LogGroups {
if logGroup == *stream.LogGroupName {
// Log Group Matched
logGroupDetails = stream
break
}
}
// Group not found. Check if nextToken is returned. If yes, need to call the API again to get the next set of log groups
if describeLogGroupsOutput.NextToken == nil {
// Stream not found and nextToken not present
nextBatchPresent = false
} else {
// There is a NextToken present. Use it to call and continue calling the API
nextToken = *describeLogGroupsOutput.NextToken
}
}
return logGroupDetails, nil
}
// IsLogGroupPresent checks and returns true when the log group is present
func (service *CloudWatchLogsService) IsLogGroupPresent(logGroup string) (bool, *cloudwatchlogs.LogGroup) {
logGroupDetails, _ := service.getLogGroupDetails(logGroup)
return logGroupDetails != nil, logGroupDetails
}
// IsLogStreamPresent checks and returns true when the log stream is present
func (service *CloudWatchLogsService) IsLogStreamPresent(logGroupName, logStreamName string) bool {
return service.getLogStreamDetails(logGroupName, logStreamName) != nil
}
// GetSequenceTokenForStream returns the current sequence token for the stream specified
func (service *CloudWatchLogsService) GetSequenceTokenForStream(logGroupName, logStreamName string) (sequenceToken *string) {
logStream := service.getLogStreamDetails(logGroupName, logStreamName)
if logStream != nil {
sequenceToken = logStream.UploadSequenceToken
}
return
}
// getLogStreamDetails Calls the DescribeLogStreams API to get the details of the Log Stream specified. Returns nil if the stream is not found
func (service *CloudWatchLogsService) getLogStreamDetails(logGroupName, logStreamName string) (logStream *cloudwatchlogs.LogStream) {
log := service.context.Log()
// Keeping the nextToken as empty in the beginning. Might get filled from response for subsequent calls
nextToken := ""
// Takes note of whether need to call the API again
nextBatchPresent := true
// Continue calling the API until we find the stream or next batch of streams is not present
for nextBatchPresent {
describeLogStreamsOutput, err := service.DescribeLogStreams(logGroupName, logStreamName, nextToken)
if err != nil {
log.Errorf("Error in calling DescribeLogStreams:%v", err)
return
}
// Iterate through the log streams and check for the input log stream
for _, stream := range describeLogStreamsOutput.LogStreams {
if logStreamName == *stream.LogStreamName {
// Log Stream Matched
logStream = stream
return
}
}
// Stream not found. Check if nextToken is returned. If yes, need to call the API again to get the next set of log streams
if describeLogStreamsOutput.NextToken == nil {
// Stream not found and nextToken not present
nextBatchPresent = false
} else {
// There is a NextToken present. Use it to call and continue calling the API
nextToken = *describeLogStreamsOutput.NextToken
}
}
return
}
// PutLogEvents calls the PutLogEvents API to push messages to CloudWatchLogs
func (service *CloudWatchLogsService) PutLogEvents(messages []*cloudwatchlogs.InputLogEvent, logGroup, logStream string, sequenceToken *string) (nextSequenceToken *string, err error) {
log := service.context.Log()
service.CreateNewServiceIfUnHealthy()
// Creating the parameters for the API Call
params := &cloudwatchlogs.PutLogEventsInput{
LogEvents: messages,
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
SequenceToken: sequenceToken,
}
// Calling the API
response, err := service.cloudWatchLogsClient.PutLogEvents(params)
if err != nil {
// Handle the common AWS errors and update the stop policy accordingly
sdkutil.HandleAwsError(log, err, service.stopPolicy)
// Cast err to awserr.Error to get the Code
errorCode := sdkutil.GetAwsErrorCode(err)
switch errorCode {
// Check for error code. Note that the AWS Retrier has already made retries for the 5xx Response Codes
case invalidSequenceTokenException:
// 400 Error, occurs when the SequenceToken is invalid. Create new SequenceToken and use it again
fallthrough
case dataAlreadyAcceptedException:
// 400 Error, occurs when the SequenceToken has been used. Create new SequenceToken and use it again
// Adding Error Count to StopPolicy before retrying to ensure the retries stop after Stop Policy error counts exceed
service.stopPolicy.AddErrorCount(1)
return service.retryPutWithNewSequenceToken(messages, logGroup, logStream)
default:
// Other 400 Errors, 500 Errors even after retries. Log the error
log.Errorf("Error in PutLogEvents:%v", err.Error())
}
return
}
nextSequenceToken = response.NextSequenceToken
return
}
// retryPutWithNewSequenceToken gets a new sequence token and retries pushing messages to cloudwatchlogs
func (service *CloudWatchLogsService) retryPutWithNewSequenceToken(messages []*cloudwatchlogs.InputLogEvent, logGroupName, logStreamName string) (*string, error) {
// Get the sequence token by calling the DescribeLogStreams API
logStream := service.getLogStreamDetails(logGroupName, logStreamName)
if logStream == nil {
// Failed to get log stream and hence the sequence token. Log the error
err := errors.New("Failed to get sequence token")
return nil, err
}
sequenceToken := logStream.UploadSequenceToken
// Successfully got the new sequence token. Retry the PutLogEvents API
return service.PutLogEvents(messages, logGroupName, logStreamName, sequenceToken)
}
// IsLogGroupEncryptedWithKMS return true if the log group is encrypted with KMS key.
func (service *CloudWatchLogsService) IsLogGroupEncryptedWithKMS(logGroup *cloudwatchlogs.LogGroup) (bool, error) {
if logGroup == nil {
return false, nil
}
if logGroup.KmsKeyId != nil {
return true, nil
}
service.context.Log().Debugf("CloudWatch log group %s is not encrypted with KMS", logGroup.LogGroupName)
return false, nil
}
// StreamData streams data from the absoluteFilePath file to cloudwatch logs.
func (service *CloudWatchLogsService) StreamData(
logGroupName string,
logStreamName string,
absoluteFilePath string,
isFileComplete bool,
isLogStreamCreated bool,
fileCompleteSignal chan bool,
cleanupControlCharacters bool,
structuredLogs bool) (success bool) {
log := service.context.Log()
log.Infof("Uploading logs at %s to CloudWatch", absoluteFilePath)
defer func() {
if r := recover(); r != nil {
log.Errorf("CloudWatch service stream data panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
service.isFileComplete = isFileComplete
go func() {
service.isFileComplete = <-fileCompleteSignal
log.Debugf("Received file complete signal %v", service.isFileComplete)
}()
// Keeps track of the last known line number that was successfully uploaded to CloudWatch.
var lastKnownLineUploadedToCWL int64 = 0
// Keeps track of the next line number upto which the logs will be uploaded to CloudWatch.
var currentLineNumber int64 = 0
var sequenceToken *string
var err error
IsLogStreamCreated := isLogStreamCreated
IsFirstTimeLogging := true
// Initialize timer and set upload frequency.
ticker := time.NewTicker(UploadFrequency)
defer ticker.Stop()
for range ticker.C {
// Get next message to be uploaded.
events, eof := service.getNextMessage(
absoluteFilePath,
&lastKnownLineUploadedToCWL,
¤tLineNumber,
cleanupControlCharacters,
structuredLogs)
// Exit case determining that the file is complete and has been scanned till EOF.
if eof {
log.Info("Finished uploading events to CloudWatch")
service.isUploadComplete = true
success = true
break
}
// If no new messages found then skip uploading.
if len(events) == 0 {
log.Trace("No events to upload to CloudWatch")
continue
}
if IsFirstTimeLogging {
log.Infof("Started CloudWatch upload")
IsFirstTimeLogging = false
}
log.Tracef("Uploading message line %d to CloudWatch", currentLineNumber)
if !IsLogStreamCreated {
log.Info("Log stream creation started")
// Terminate process if the log stream cannot be created
if err := service.CreateLogStream(logGroupName, logStreamName); err != nil {
log.Errorf("Error Creating Log Stream for CloudWatchLogs output: %v", err)
currentLineNumber = lastKnownLineUploadedToCWL
log.Debug("Failed to upload message to CloudWatch")
break
} else {
log.Info("Log stream already created")
IsLogStreamCreated = true
}
log.Info("Log stream creation ended")
}
// Use sequenceToken returned by PutLogEvents if present, else fetch new one
if sequenceToken == nil {
log.Info("Calling Get Sequence token")
sequenceToken = service.GetSequenceTokenForStream(logGroupName, logStreamName)
log.Info("Received Sequence token")
}
sequenceToken, err = service.PutLogEvents(events, logGroupName, logStreamName, sequenceToken)
if err == nil {
// Set the last known line to current since the upload was successful.
lastKnownLineUploadedToCWL = currentLineNumber
log.Tracef("Successfully uploaded message line %v to CloudWatch", currentLineNumber)
} else {
if errCode := sdkutil.GetAwsErrorCode(err); errCode == resourceNotFoundException {
// Log group or log stream not found due to resource change outside of client. Stop log streaming for session
log.Errorf(
"Log group \"%s\" or log stream \"%s\" not found. Log stream stopped. Error:%v",
logGroupName,
logStreamName,
err)
break
}
// Upload failed for unknown reason. Reset the current line to last known line and retry upload again in the next iteration
currentLineNumber = lastKnownLineUploadedToCWL
log.Warnf("Failed to upload message to CloudWatch, err: %v", err)
}
}
return success
}
// getNextMessage gets the next message to be uploaded to cloudwatch.
func (service *CloudWatchLogsService) getNextMessage(
absoluteFilePath string,
lastKnownLineUploadedToCWL *int64,
currentLineNumber *int64,
cleanupControlCharacters bool,
structuredLogs bool) (allEvents []*cloudwatchlogs.InputLogEvent, eof bool) {
log := service.context.Log()
// Open file to read.
file, err := os.Open(absoluteFilePath)
if err != nil {
log.Warnf("Error opening file: %v", err)
if service.isFileComplete {
// End log update process if file complete to avoid file issue causing infinite loop.
eof = true
}
return
}
defer file.Close()
var messageThreshold int
if structuredLogs {
messageThreshold = StreamMessageLengthThresholdInBytes
} else {
messageThreshold = MessageLengthThresholdInBytes
}
reader := bufio.NewReaderSize(file, messageThreshold)
// Skip to the last uploaded line.
if *lastKnownLineUploadedToCWL > 0 {
var lastLine int64 = 0
_, err := reader.ReadSlice(NewLineCharacter)
for err == nil || err == bufio.ErrBufferFull {
lastLine++
if lastLine == *lastKnownLineUploadedToCWL {
break
}
_, err = reader.ReadSlice(NewLineCharacter)
}
if err != nil && err != io.EOF && err != bufio.ErrBufferFull {
log.Warnf("Error skipping to last uploaded Cloudwatch line: %v", err)
return
}
}
var message, line []byte
for {
// Scan the next set of lines to upload.
line, err = reader.ReadSlice(NewLineCharacter)
if err != nil && err != bufio.ErrBufferFull {
// Breaking out of loop since nothing to upload
if err != io.EOF || len(line) == 0 || !service.isFileComplete {
break
}
}
// Process message if needed before uploading to CW
line = processMessage(log, line, cleanupControlCharacters)
// Check if message length threshold for the event has reached.
// If true, then construct event with existing message so that new line will get added to the next event.
// If false, then continue to append new line to existing message.
if (len(message) + len(line)) > messageThreshold {
log.Tracef("Appending line to current Cloudwatch event message"+
" exceeds length limit %v bytes. [Line: %v] [Length: %v]",
messageThreshold, *currentLineNumber, len(message)+len(line))
event := service.buildEventInfo(message, structuredLogs)
log.Trace("Created CloudWatch event from current event message buffer")
allEvents = append(allEvents, event)
if len(allEvents) >= maxNumberOfEventsPerCall {
return
}
log.Trace("Reset Cloudwatch event message buffer")
message = nil
}
message = append(message, line...)
*currentLineNumber++
}
if err != io.EOF && err != nil {
log.Warnf("Error reading from Cloudwatch logs file:", err)
}
// Build event with the message read so far to be uploaded to CW
if len(message) > 0 {
event := service.buildEventInfo(message, structuredLogs)
allEvents = append(allEvents, event)
return
}
// This determines the end of session.
if len(message) == 0 && (err == nil || err == io.EOF) && service.isFileComplete {
eof = true
}
return
}
// processMessage is used to process message before uploading to CW like cleaning up ANSI control characters
func processMessage(log log.T, line []byte, cleanupANSICharacters bool) (processedLine []byte) {
defer func() {
if err := recover(); err != nil {
log.Tracef("processMessage encountered error: %v", err)
}
}()
// Do nothing if cleanup of ANSI characters not required
if !cleanupANSICharacters {
return line
}
// Strip ANSI control sequences like color codes
processedLine = line
processedLine, err := ansi.Strip(line)
if err != nil {
processedLine = line
}
return processedLine
}
// buildEventInfo constructs event to be uploaded to CW
func (service *CloudWatchLogsService) buildEventInfo(message []byte, structuredLogs bool) *cloudwatchlogs.InputLogEvent {
var formattedMessage string
// Construct CloudWatch event in JSON format if structured logs required
if structuredLogs {
currentTime := time.Now().UTC()
messageString := string(message)
messageString = strings.ReplaceAll(messageString, "\t", " ")
messageString = strings.ReplaceAll(messageString, "\r", "")
messageList := strings.Split(messageString, "\n")
if messageList[len(messageList)-1] == "" {
messageList = messageList[:len(messageList)-1]
}
service.CloudWatchMessage.EventTime = aws.String(currentTime.Format(time.RFC3339))
service.CloudWatchMessage.SessionData = aws.StringSlice(messageList)
formattedMessageBytes, _ := json.Marshal(service.CloudWatchMessage)
formattedMessage = string(formattedMessageBytes)
} else {
formattedMessage = strings.ReplaceAll(string(message), "\r\n", "\n")
if service.isFileComplete && message[len(message)-1] == byte(NewLineCharacter) {
formattedMessage = formattedMessage[:len(formattedMessage)-1]
}
}
event := &cloudwatchlogs.InputLogEvent{
Message: aws.String(formattedMessage),
Timestamp: aws.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
}
return event
}
func (service *CloudWatchLogsService) SetIsFileComplete(val bool) {
service.isFileComplete = val
}
func (service *CloudWatchLogsService) GetIsUploadComplete() bool {
return service.isUploadComplete
}