in load_tests/validation/validate.go [108:174]
func validate_s3(s3Client *s3.S3, bucket string, prefix string) int {
var continuationToken *string
var input *s3.ListObjectsV2Input
s3RecordCounter := 0
s3ObjectCounter := 0
for {
input = &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
ContinuationToken: continuationToken,
Prefix: aws.String(prefix),
}
response, err := s3Client.ListObjectsV2(input)
if err != nil {
exitErrorf("[TEST FAILURE] Error occurred to get the objects from bucket: %q., %v", bucket, err)
}
for _, content := range response.Contents {
input := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: content.Key,
}
obj, err := s3Client.GetObject(input)
if err != nil {
exitErrorf("[TEST FAILURE] Error to get S3 object. %v", err)
}
s3ObjectCounter++
// Directly unmarshal the JSON objects from the S3 object body
decoder := json.NewDecoder(obj.Body)
for {
var message Message
err := decoder.Decode(&message)
if err == io.EOF {
break
}
if err != nil {
fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err)
continue
}
recordId := message.Log[:8]
s3RecordCounter++
value, err := strconv.ParseUint(recordId, 10, 32)
if err != nil {
fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err)
continue
}
recordIdUint := uint32(value)
inputMap[recordIdUint] = struct{}{}
}
// Close the S3 object body
obj.Body.Close()
}
if !aws.BoolValue(response.IsTruncated) {
break
}
continuationToken = response.NextContinuationToken
}
fmt.Println("total_s3_obj, ", s3ObjectCounter)
return s3RecordCounter
}