validator/validators/performance/performance_validator.go (281 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package performance
import (
"fmt"
"log"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/cenkalti/backoff/v4"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/validator/models"
"github.com/aws/amazon-cloudwatch-agent-test/validator/validators/basic"
)
const (
ServiceName = "AmazonCloudWatchAgent"
DynamoDBDataBase = "CWAPerformanceMetrics"
)
var (
// The default unit for these metrics is byte. However, we want to convert to MB for easier understanding
metricsConvertToMB = []string{"mem_total", "procstat_memory_rss", "procstat_memory_swap", "procstat_memory_data", "procstat_memory_vms", "procstat_write_bytes", "procstat_bytes_sent", "memory_rss", "memory_vms", "write_bytes", "Bytes_Sent_Per_Sec", "Available_Bytes"}
)
type PerformanceValidator struct {
vConfig models.ValidateConfig
models.ValidatorFactory
}
var _ models.ValidatorFactory = (*PerformanceValidator)(nil)
func NewPerformanceValidator(vConfig models.ValidateConfig) models.ValidatorFactory {
return &PerformanceValidator{
vConfig: vConfig,
ValidatorFactory: basic.NewBasicValidator(vConfig),
}
}
func (s *PerformanceValidator) CheckData(startTime, endTime time.Time) error {
perfInfo := PerformanceInformation{}
if s.vConfig.GetOSFamily() == "windows" {
stat, err := s.GetWindowsPerformanceMetrics(startTime, endTime)
if err != nil {
return err
}
perfInfo, err = s.CalculateWindowsMetricStatsAndPackMetrics(stat)
if err != nil {
return err
}
} else {
metrics, err := s.GetPerformanceMetrics(startTime, endTime)
if err != nil {
return err
}
perfInfo, err = s.CalculateMetricStatsAndPackMetrics(metrics)
if err != nil {
return err
}
}
err := s.SendPacketToDatabase(perfInfo)
if err != nil {
return err
}
return nil
}
func (s *PerformanceValidator) SendPacketToDatabase(perfInfo PerformanceInformation) error {
var (
dataType = s.vConfig.GetDataType()
receiver = s.vConfig.GetPluginsConfig()[0] //Assuming one plugin at a time
commitHash, commitDate = s.vConfig.GetCommitInformation()
agentCollectionPeriod = fmt.Sprint(s.vConfig.GetAgentCollectionPeriod().Seconds())
// The secondary global index that is used for checking if there are item has already been exist in the table
// The performance validator will query based on the UseCaseHash to confirm if the current commit with the use case
// has been exist or not? If yes, merge it. If not, sending it to the database
// https://github.com/aws/amazon-cloudwatch-agent-test/blob/e07fe7adb1b1d75244d8984507d3f83a7237c3d3/terraform/setup/main.tf#L46-L53
kCheckingAttribute = []string{"CommitHash", "UseCase"}
vCheckingAttribute = []string{fmt.Sprint(commitHash), receiver}
)
err := backoff.Retry(func() error {
existingPerfInfo, err := awsservice.GetItemInDatabase(DynamoDBDataBase, "UseCaseHash", kCheckingAttribute, vCheckingAttribute, perfInfo)
if err != nil {
return err
}
// Get the latest performance information from the database and update by merging the existing one
// and finally replace the packet in the database
maps.Copy(existingPerfInfo["Results"].(map[string]interface{}), perfInfo["Results"].(map[string]interface{}))
finalPerfInfo := packIntoPerformanceInformation(existingPerfInfo["UniqueID"].(string), receiver, dataType, agentCollectionPeriod, commitHash, commitDate, existingPerfInfo["Results"])
err = awsservice.ReplaceItemInDatabase(DynamoDBDataBase, finalPerfInfo)
if err != nil {
return err
}
return nil
}, awsservice.StandardExponentialBackoff)
return err
}
func (s *PerformanceValidator) CalculateMetricStatsAndPackMetrics(metrics []types.MetricDataResult) (PerformanceInformation, error) {
var (
receiver = s.vConfig.GetPluginsConfig()[0] //Assuming one plugin at a time
commitHash, commitDate = s.vConfig.GetCommitInformation()
dataType = s.vConfig.GetDataType()
dataRate = fmt.Sprint(s.vConfig.GetDataRate())
uniqueID = s.vConfig.GetUniqueID()
agentCollectionPeriod = s.vConfig.GetAgentCollectionPeriod().Seconds()
)
performanceMetricResults := make(map[string]Stats)
for _, metric := range metrics {
metricLabel := strings.Split(*metric.Label, " ")
metricName := metricLabel[len(metricLabel)-1]
metricValues := metric.Values
//Convert every bytes to MB
if slices.Contains(metricsConvertToMB, metricName) {
for i, val := range metricValues {
metricValues[i] = val / (1024 * 1024)
}
}
log.Printf("Start calculate metric statictics for metric %s %v \n", metricName, metricValues)
if !isAllValuesGreaterThanOrEqualToZero(metricValues) {
return nil, fmt.Errorf("\n values are not all greater than or equal to zero for metric %s with values: %v", metricName, metricValues)
}
metricStats := CalculateMetricStatisticsBasedOnDataAndPeriod(metricValues, agentCollectionPeriod)
log.Printf("Finished calculate metric statictics for metric %s: %v \n", metricName, metricStats)
performanceMetricResults[metricName] = metricStats
}
return packIntoPerformanceInformation(uniqueID, receiver, dataType, fmt.Sprint(agentCollectionPeriod), commitHash, commitDate, map[string]interface{}{dataRate: performanceMetricResults}), nil
}
func (s *PerformanceValidator) CalculateWindowsMetricStatsAndPackMetrics(statistic []*cloudwatch.GetMetricStatisticsOutput) (PerformanceInformation, error) {
var (
receiver = s.vConfig.GetPluginsConfig()[0] //Assuming one plugin at a time
commitHash, commitDate = s.vConfig.GetCommitInformation()
dataType = s.vConfig.GetDataType()
dataRate = fmt.Sprint(s.vConfig.GetDataRate())
uniqueID = s.vConfig.GetUniqueID()
agentCollectionPeriod = s.vConfig.GetAgentCollectionPeriod().Seconds()
)
performanceMetricResults := make(map[string]Stats)
for _, metric := range statistic {
metricLabel := strings.Split(*metric.Label, " ")
metricName := metricLabel[len(metricLabel)-1]
metricValues := metric.Datapoints
//Convert every bytes to MB
if slices.Contains(metricsConvertToMB, metricName) {
for i, val := range metricValues {
*metricValues[i].Average = *val.Average / (1024 * 1024)
}
}
log.Printf("Start calculate metric statictics for metric %s \n", metricName)
if !isAllStatisticsGreaterThanOrEqualToZero(metricValues) {
return nil, fmt.Errorf("\n values are not all greater than or equal to zero for metric %s with values: %v", metricName, metricValues)
}
// GetMetricStatistics provides these statistics, however this will require maintaining multiple data arrays
// and can be difficult for code readability. This way follows the same calculation pattern as Linux
// and simplify the logics.
var data []float64
for _, datapoint := range metric.Datapoints {
data = append(data, *datapoint.Average)
}
metricStats := CalculateMetricStatisticsBasedOnDataAndPeriod(data, agentCollectionPeriod)
log.Printf("Finished calculate metric statictics for metric %s: %+v \n", metricName, metricStats)
performanceMetricResults[metricName] = metricStats
}
return packIntoPerformanceInformation(uniqueID, receiver, dataType, fmt.Sprint(agentCollectionPeriod), commitHash, commitDate, map[string]interface{}{dataRate: performanceMetricResults}), nil
}
func (s *PerformanceValidator) GetPerformanceMetrics(startTime, endTime time.Time) ([]types.MetricDataResult, error) {
var (
metricNamespace = s.vConfig.GetMetricNamespace()
validationMetric = s.vConfig.GetMetricValidation()
ec2InstanceId = awsservice.GetInstanceId()
performanceMetricDataQueries = []types.MetricDataQuery{}
)
log.Printf("Start getting performance metrics from CloudWatch")
for _, metric := range validationMetric {
metricDimensions := []types.Dimension{
{
Name: aws.String("InstanceId"),
Value: aws.String(ec2InstanceId),
},
}
for _, dimension := range metric.MetricDimension {
metricDimensions = append(metricDimensions, types.Dimension{
Name: aws.String(dimension.Name),
Value: aws.String(dimension.Value),
})
}
performanceMetricDataQueries = append(performanceMetricDataQueries, s.buildPerformanceMetricQueries(metric.MetricName, metricNamespace, metricDimensions))
}
for _, stat := range validationMetric {
metricDimensions := []types.Dimension{
{
Name: aws.String("InstanceId"),
Value: aws.String(ec2InstanceId),
},
}
for _, dimension := range stat.MetricDimension {
metricDimensions = append(metricDimensions, types.Dimension{
Name: aws.String(dimension.Name),
Value: aws.String(dimension.Value),
})
}
}
metrics, err := awsservice.GetMetricData(performanceMetricDataQueries, startTime, endTime)
if err != nil {
return nil, err
}
return metrics.MetricDataResults, nil
}
func (s *PerformanceValidator) GetWindowsPerformanceMetrics(startTime, endTime time.Time) ([]*cloudwatch.GetMetricStatisticsOutput, error) {
var (
metricNamespace = s.vConfig.GetMetricNamespace()
validationMetric = s.vConfig.GetMetricValidation()
ec2InstanceId = awsservice.GetInstanceId()
)
log.Printf("Start getting performance metrics from CloudWatch")
var statistics = []*cloudwatch.GetMetricStatisticsOutput{}
for _, stat := range validationMetric {
metricDimensions := []types.Dimension{
{
Name: aws.String("InstanceId"),
Value: aws.String(ec2InstanceId),
},
}
for _, dimension := range stat.MetricDimension {
metricDimensions = append(metricDimensions, types.Dimension{
Name: aws.String(dimension.Name),
Value: aws.String(dimension.Value),
})
}
log.Printf("Trying to get Metric %s for GetMetricStatistic ", stat.MetricName)
statList := []types.Statistic{
types.StatisticAverage,
}
// Windows procstat metrics always append a space and GetMetricData does not support space character
// Only workaround is to use GetMetricStatistics and retrieve the datapoints on a secondly period
statistic, err := awsservice.GetMetricStatistics(stat.MetricName, metricNamespace, metricDimensions, startTime, endTime, 1, statList, nil)
if err != nil {
return nil, err
}
statistics = append(statistics, statistic)
log.Printf("Statistics for Metric: %s", stat.MetricName)
for _, datapoint := range statistic.Datapoints {
log.Printf("Average: %f", *(datapoint.Average))
}
}
return statistics, nil
}
func (s *PerformanceValidator) buildPerformanceMetricQueries(metricName, metricNamespace string, metricDimensions []types.Dimension) types.MetricDataQuery {
metricInformation := types.Metric{
Namespace: aws.String(metricNamespace),
MetricName: aws.String(metricName),
Dimensions: metricDimensions,
}
metricDataQuery := types.MetricDataQuery{
MetricStat: &types.MetricStat{
Metric: &metricInformation,
Period: aws.Int32(10),
Stat: aws.String(string(models.AVERAGE)),
},
Id: aws.String(strings.ToLower(metricName)),
}
return metricDataQuery
}
// packIntoPerformanceInformation will package all the information into the required format of MongoDb Database
// https://github.com/aws/amazon-cloudwatch-agent-test/blob/e07fe7adb1b1d75244d8984507d3f83a7237c3d3/terraform/setup/main.tf#L8-L63
func packIntoPerformanceInformation(uniqueID, receiver, dataType, collectionPeriod, commitHash string, commitDate int64, result interface{}) PerformanceInformation {
instanceAMI := awsservice.GetImageId()
instanceType := awsservice.GetInstanceType()
return PerformanceInformation{
"UniqueID": uniqueID,
"Service": ServiceName,
"UseCase": receiver,
"CommitDate": commitDate,
"CommitHash": commitHash,
"DataType": dataType,
"Results": result,
"CollectionPeriod": collectionPeriod,
"InstanceAMI": instanceAMI,
"InstanceType": instanceType,
}
}
func isAllValuesGreaterThanOrEqualToZero(values []float64) bool {
if len(values) == 0 {
return false
}
for _, value := range values {
if value < 0 {
return false
}
}
return true
}
func isAllStatisticsGreaterThanOrEqualToZero(datapoints []types.Datapoint) bool {
if len(datapoints) == 0 {
return false
}
for _, datapoint := range datapoints {
if *datapoint.Average < 0 {
return false
}
}
return true
}