in load_tests/validation/validate.go [227:289]
func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup string, logStream string) int {
var forwardToken *string
var input *cloudwatchlogs.GetLogEventsInput
cwRecoredCounter := 0
// Returns all log events from a CloudWatch log group with the given log stream.
// This approach utilizes NextForwardToken to pull all log events from the CloudWatch log group.
for {
if forwardToken == nil {
input = &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
StartFromHead: aws.Bool(true),
Limit: aws.Int64(10000),
}
} else {
input = &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
NextToken: forwardToken,
StartFromHead: aws.Bool(true),
Limit: aws.Int64(10000),
}
// Sleep between GetLogEvents calls to avoid throttling
time.Sleep(100 * time.Millisecond)
}
response, err := cwClient.GetLogEvents(input)
for err != nil {
// retry for throttling exception
if strings.Contains(err.Error(), "ThrottlingException: Rate exceeded") {
time.Sleep(5 * time.Second)
response, err = cwClient.GetLogEvents(input)
} else {
exitErrorf("[TEST FAILURE] Error occured to get the log events from log group: %q., %v", logGroup, err)
}
}
for _, event := range response.Events {
log := aws.StringValue(event.Message)
// First 8 char is the unique record ID
recordId := log[:8]
value, err := strconv.ParseUint(recordId, 10, 32)
if err != nil {
fmt.Println("Error:", err)
continue
}
recordIdUint := uint32(value)
cwRecoredCounter += 1
inputMap[recordIdUint] = struct{}{}
}
// Same NextForwardToken will be returned if we reach the end of the log stream
if aws.StringValue(response.NextForwardToken) == aws.StringValue(forwardToken) {
break
}
forwardToken = response.NextForwardToken
}
return cwRecoredCounter
}