func validate_s3()

in load_tests/validation/validate.go [108:174]


func validate_s3(s3Client *s3.S3, bucket string, prefix string) int {
	var continuationToken *string
	var input *s3.ListObjectsV2Input
	s3RecordCounter := 0
	s3ObjectCounter := 0

	for {
		input = &s3.ListObjectsV2Input{
			Bucket:            aws.String(bucket),
			ContinuationToken: continuationToken,
			Prefix:            aws.String(prefix),
		}

		response, err := s3Client.ListObjectsV2(input)
		if err != nil {
			exitErrorf("[TEST FAILURE] Error occurred to get the objects from bucket: %q., %v", bucket, err)
		}

		for _, content := range response.Contents {
			input := &s3.GetObjectInput{
				Bucket: aws.String(bucket),
				Key:    content.Key,
			}
			obj, err := s3Client.GetObject(input)
			if err != nil {
				exitErrorf("[TEST FAILURE] Error to get S3 object. %v", err)
			}
			s3ObjectCounter++

			// Directly unmarshal the JSON objects from the S3 object body
			decoder := json.NewDecoder(obj.Body)
			for {
				var message Message
				err := decoder.Decode(&message)
				if err == io.EOF {
					break
				}
				if err != nil {
					fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err)
					continue
				}

				recordId := message.Log[:8]
				s3RecordCounter++
				value, err := strconv.ParseUint(recordId, 10, 32)
				if err != nil {
					fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err)
					continue
				}
				recordIdUint := uint32(value)
				inputMap[recordIdUint] = struct{}{}
			}

			// Close the S3 object body
			obj.Body.Close()
		}

		if !aws.BoolValue(response.IsTruncated) {
			break
		}
		continuationToken = response.NextContinuationToken
	}

	fmt.Println("total_s3_obj, ", s3ObjectCounter)

	return s3RecordCounter
}