x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go (559 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package cloudwatch
import (
"crypto/fips140"
"fmt"
"maps"
"reflect"
"strconv"
"strings"
"time"
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/apigateway"
"github.com/aws/aws-sdk-go-v2/service/apigatewayv2"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
resourcegroupstaggingapitypes "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/types"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/aws"
"github.com/elastic/elastic-agent-libs/logp"
)
const checkns = "AWS/ApiGateway"
const checkresource_type = "apigateway:restapis"
var (
metricsetName = "cloudwatch"
defaultStatistics = []string{"Average", "Maximum", "Minimum", "Sum", "SampleCount"}
dimensionSeparator = ","
dimensionValueWildcard = "*"
checkns_lower = strings.ToLower(checkns)
checkresource_type_lower = strings.ToLower(checkresource_type)
)
type APIClients struct {
CloudWatchClient *cloudwatch.Client
Resourcegroupstaggingapi *resourcegroupstaggingapi.Client
Apigateway *apigateway.Client
Apigatewayv2 *apigatewayv2.Client
}
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet(aws.ModuleName, metricsetName, New,
mb.DefaultMetricSet(),
)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
logger *logp.Logger
CloudwatchConfigs []Config `config:"metrics" validate:"nonzero,required"`
PreviousEndTime time.Time
}
// Dimension holds name and value for cloudwatch metricset dimension config.
type Dimension struct {
Name string `config:"name" validate:"nonzero"`
Value string `config:"value" validate:"nonzero"`
}
// Config holds a configuration specific for cloudwatch metricset.
type Config struct {
Namespace string `config:"namespace" validate:"nonzero,required"`
MetricName []string `config:"name"`
Dimensions []Dimension `config:"dimensions"`
ResourceType string `config:"resource_type"`
Statistic []string `config:"statistic"`
}
type metricsWithStatistics struct {
cloudwatchMetric aws.MetricWithID
statistic []string
}
type listMetricWithDetail struct {
metricsWithStats []metricsWithStatistics
resourceTypeFilters map[string][]aws.Tag
}
// namespaceDetail collects configuration details for each namespace
type namespaceDetail struct {
resourceTypeFilter string
names []string
tags []aws.Tag
statistics []string
dimensions []types.Dimension
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logger := logp.NewLogger(aws.ModuleName + "." + metricsetName)
metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, fmt.Errorf("error creating aws metricset: %w", err)
}
config := struct {
CloudwatchMetrics []Config `config:"metrics" validate:"nonzero,required"`
}{}
err = base.Module().UnpackConfig(&config)
if err != nil {
return nil, fmt.Errorf("error unpack raw module config using UnpackConfig: %w", err)
}
logger.Debugf("cloudwatch config = %s", config)
if len(config.CloudwatchMetrics) == 0 {
return nil, fmt.Errorf("metrics in config is missing: %w", err)
}
return &MetricSet{
MetricSet: metricSet,
logger: logger,
CloudwatchConfigs: config.CloudwatchMetrics,
}, nil
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency, m.PreviousEndTime)
m.PreviousEndTime = endTime
m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime)
// Initialise the map that will be used in case APIGateway api is configured. Infoapi includes Name_of_API:ID_of_API entries
infoapi := make(map[string]string)
// Check statistic method in config
err := m.checkStatistics()
if err != nil {
return fmt.Errorf("checkStatistics failed: %w", err)
}
// Get listMetricDetailTotal and namespaceDetailTotal from configuration
listMetricDetailTotal, namespaceDetailTotal := m.readCloudwatchConfig()
m.logger.Debugf("listMetricDetailTotal = %s", listMetricDetailTotal)
m.logger.Debugf("namespaceDetailTotal = %s", namespaceDetailTotal)
var config aws.Config
err = m.Module().UnpackConfig(&config)
if err != nil {
return err
}
// Starting from Go 1.24, when FIPS 140-3 mode is active, fips140.Enabled() will return true.
// So, regardless of whether `fips_enabled` is set to true or false, when FIPS 140-3 mode is active, the
// resolver will resolve to the FIPS endpoint.
// See: https://go.dev/doc/security/fips140#fips-140-3-mode
if fips140.Enabled() {
config.AWSConfig.FIPSEnabled = true
}
// Create events based on listMetricDetailTotal from configuration
if len(listMetricDetailTotal.metricsWithStats) != 0 {
for _, regionName := range m.MetricSet.RegionsList {
m.logger.Debugf("Collecting metrics from AWS region %s", regionName)
beatsConfig := m.MetricSet.AwsConfig.Copy()
beatsConfig.Region = regionName
APIClients, err := m.createAwsRequiredClients(beatsConfig, regionName, config)
if err != nil {
m.Logger().Warn("skipping metrics list from region '%s'", regionName)
}
eventsWithIdentifier, err := m.createEvents(APIClients.CloudWatchClient, APIClients.Resourcegroupstaggingapi, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, infoapi, regionName, startTime, endTime)
if err != nil {
return fmt.Errorf("createEvents failed for region %s: %w", regionName, err)
}
m.logger.Debugf("Collected metrics of metrics = %d", len(eventsWithIdentifier))
for _, event := range eventsWithIdentifier {
_ = event.RootFields.Delete(aws.CloudWatchPeriodName)
report.Event(event)
}
}
}
// Create events based on namespaceDetailTotal from configuration
for _, regionName := range m.MetricSet.RegionsList {
m.logger.Debugf("Collecting metrics from AWS region %s", regionName)
beatsConfig := m.MetricSet.AwsConfig.Copy()
beatsConfig.Region = regionName
APIClients, err := m.createAwsRequiredClients(beatsConfig, regionName, config)
if err != nil {
m.Logger().Warn("skipping metrics list from region '%s'", regionName, err)
continue
}
// retrieve all the details for all the metrics available in the current region when no namespace is specified
// otherwise only retrieve metrics from the specific namespaces from the config
var listMetricsOutput []aws.MetricWithID
if len(namespaceDetailTotal) == 0 {
listMetricsOutput, err = aws.GetListMetricsOutput("*", regionName, m.Period, m.IncludeLinkedAccounts, m.OwningAccount, m.MonitoringAccountID, APIClients.CloudWatchClient)
if err != nil {
m.Logger().Errorf("Error while retrieving the list of metrics for region %s and namespace %s: %w", regionName, "*", err)
}
} else {
for namespace := range namespaceDetailTotal {
listMetricsOutputPerNamespace, err := aws.GetListMetricsOutput(namespace, regionName, m.Period, m.IncludeLinkedAccounts, m.OwningAccount, m.MonitoringAccountID, APIClients.CloudWatchClient)
if err != nil {
m.Logger().Errorf("Error while retrieving the list of metrics for region %s and namespace %s: %w", regionName, namespace, err)
}
listMetricsOutput = append(listMetricsOutput, listMetricsOutputPerNamespace...)
}
}
if len(listMetricsOutput) == 0 {
continue
}
for namespace, namespaceDetails := range namespaceDetailTotal {
m.logger.Debugf("Collected metrics from namespace %s", namespace)
// filter listMetricsOutput by detailed configuration per each namespace
filteredMetricWithStatsTotal := filterListMetricsOutput(listMetricsOutput, namespace, namespaceDetails)
// get resource type filters and tags filters for each namespace
resourceTypeTagFilters := constructTagsFilters(namespaceDetails)
//Check whether namespace is APIGW
if strings.Contains(strings.ToLower(namespace), checkns_lower) {
useonlyrest := false
if len(resourceTypeTagFilters) == 1 {
for key := range resourceTypeTagFilters {
if strings.Compare(strings.ToLower(key), checkresource_type_lower) == 0 {
useonlyrest = true
}
}
}
// inforestapi includes only Rest APIs
if useonlyrest {
infoapi, err = aws.GetAPIGatewayRestAPIOutput(APIClients.Apigateway, config.LimitRestAPI)
if err != nil {
m.Logger().Errorf("could not get rest apis output: %v", err)
}
} else {
// infoapi includes only Rest APIs
// apiGatewayAPI includes only WebSocket and HTTP APIs
infoapi, err = aws.GetAPIGatewayRestAPIOutput(APIClients.Apigateway, config.LimitRestAPI)
if err != nil {
m.Logger().Errorf("could not get rest apis output: %v", err)
}
apiGatewayAPI, err := aws.GetAPIGatewayAPIOutput(APIClients.Apigatewayv2)
if err != nil {
m.Logger().Errorf("could not get http and websocket apis output: %v", err)
}
if len(apiGatewayAPI) > 0 {
maps.Copy(infoapi, apiGatewayAPI)
}
}
m.Logger().Debugf("infoapi response: %v", infoapi)
}
eventsWithIdentifier, err := m.createEvents(APIClients.CloudWatchClient, APIClients.Resourcegroupstaggingapi, filteredMetricWithStatsTotal, resourceTypeTagFilters, infoapi, regionName, startTime, endTime)
if err != nil {
return fmt.Errorf("createEvents failed for region %s: %w", regionName, err)
}
m.logger.Debugf("Collected number of metrics = %d", len(eventsWithIdentifier))
events, err := addMetadata(m.logger, namespace, regionName, beatsConfig, config.AWSConfig.FIPSEnabled, eventsWithIdentifier)
if err != nil {
// TODO What to do if add metadata fails? I guess to continue, probably we have an 90% of reliable data
m.Logger().Warnf("could not add metadata to events: %v", err)
}
for _, event := range events {
_ = event.RootFields.Delete(aws.CloudWatchPeriodName)
report.Event(event)
}
}
}
return nil
}
// createAwsRequiredClients will return the two necessary client instances to do Metric requests to the AWS API
func (m *MetricSet) createAwsRequiredClients(beatsConfig awssdk.Config, regionName string, config aws.Config) (APIClients, error) {
m.logger.Debugf("Collecting metrics from AWS region %s", regionName)
APIClients := APIClients{}
APIClients.CloudWatchClient = cloudwatch.NewFromConfig(beatsConfig, func(o *cloudwatch.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
APIClients.Resourcegroupstaggingapi = resourcegroupstaggingapi.NewFromConfig(beatsConfig, func(o *resourcegroupstaggingapi.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
APIClients.Apigateway = apigateway.NewFromConfig(beatsConfig, func(o *apigateway.Options) {
})
APIClients.Apigatewayv2 = apigatewayv2.NewFromConfig(beatsConfig, func(o *apigatewayv2.Options) {
})
return APIClients, nil
}
// filterListMetricsOutput compares config details with listMetricsOutput and filter out the ones don't match
func filterListMetricsOutput(listMetricsOutput []aws.MetricWithID, namespace string, namespaceDetails []namespaceDetail) []metricsWithStatistics {
var filteredMetricWithStatsTotal []metricsWithStatistics
for _, listMetric := range listMetricsOutput {
if *listMetric.Metric.Namespace == namespace {
for _, configPerNamespace := range namespaceDetails {
if configPerNamespace.names != nil {
// Consider only the metrics that exist in the configuration
exists, _ := aws.StringInSlice(*listMetric.Metric.MetricName, configPerNamespace.names)
if !exists {
continue
}
}
if configPerNamespace.dimensions != nil {
if !compareAWSDimensions(listMetric.Metric.Dimensions, configPerNamespace.dimensions) {
continue
}
}
filteredMetricWithStatsTotal = append(filteredMetricWithStatsTotal,
metricsWithStatistics{
cloudwatchMetric: listMetric,
statistic: configPerNamespace.statistics,
})
}
}
}
return filteredMetricWithStatsTotal
}
// Collect resource type filters and tag filters from config for cloudwatch
func constructTagsFilters(namespaceDetails []namespaceDetail) map[string][]aws.Tag {
resourceTypeTagFilters := map[string][]aws.Tag{}
for _, configPerNamespace := range namespaceDetails {
if configPerNamespace.resourceTypeFilter != "" {
if _, ok := resourceTypeTagFilters[configPerNamespace.resourceTypeFilter]; ok {
resourceTypeTagFilters[configPerNamespace.resourceTypeFilter] = append(resourceTypeTagFilters[configPerNamespace.resourceTypeFilter], configPerNamespace.tags...)
} else {
resourceTypeTagFilters[configPerNamespace.resourceTypeFilter] = configPerNamespace.tags
}
}
}
return resourceTypeTagFilters
}
func (m *MetricSet) checkStatistics() error {
for _, config := range m.CloudwatchConfigs {
for _, stat := range config.Statistic {
if _, ok := statisticLookup(stat); !ok {
return fmt.Errorf("statistic method specified is not valid: %s", stat)
}
}
}
return nil
}
func (m *MetricSet) readCloudwatchConfig() (listMetricWithDetail, map[string][]namespaceDetail) {
var listMetricDetailTotal listMetricWithDetail
namespaceDetailTotal := map[string][]namespaceDetail{}
var metricsWithStatsTotal []metricsWithStatistics
resourceTypesWithTags := map[string][]aws.Tag{}
for _, config := range m.CloudwatchConfigs {
// If there is no statistic method specified, then use the default.
if config.Statistic == nil {
config.Statistic = defaultStatistics
}
var cloudwatchDimensions []types.Dimension
for _, dim := range config.Dimensions {
name := dim.Name
value := dim.Value
cloudwatchDimensions = append(cloudwatchDimensions, types.Dimension{
Name: &name,
Value: &value,
})
}
// if any Dimension value contains wildcard, then compare dimensions with
// listMetrics result in filterListMetricsOutput
if config.MetricName != nil && config.Dimensions != nil &&
!configDimensionValueContainsWildcard(config.Dimensions) {
namespace := config.Namespace
for i := range config.MetricName {
metric := types.Metric{
Namespace: &namespace,
MetricName: &config.MetricName[i],
Dimensions: cloudwatchDimensions,
}
metricsWithStats := metricsWithStatistics{
cloudwatchMetric: aws.MetricWithID{
Metric: metric,
},
statistic: config.Statistic,
}
metricsWithStatsTotal = append(metricsWithStatsTotal, metricsWithStats)
}
if config.ResourceType != "" {
resourceTypesWithTags[config.ResourceType] = m.MetricSet.TagsFilter
}
continue
}
configPerNamespace := namespaceDetail{
names: config.MetricName,
tags: m.MetricSet.TagsFilter,
statistics: config.Statistic,
resourceTypeFilter: config.ResourceType,
dimensions: cloudwatchDimensions,
}
namespaceDetailTotal[config.Namespace] = append(namespaceDetailTotal[config.Namespace], configPerNamespace)
}
listMetricDetailTotal.resourceTypeFilters = resourceTypesWithTags
listMetricDetailTotal.metricsWithStats = metricsWithStatsTotal
return listMetricDetailTotal, namespaceDetailTotal
}
func createMetricDataQueries(listMetricsTotal []metricsWithStatistics, dataGranularity time.Duration) []types.MetricDataQuery {
var metricDataQueries []types.MetricDataQuery
for i, listMetric := range listMetricsTotal {
for j, statistic := range listMetric.statistic {
stat := statistic
metric := listMetric.cloudwatchMetric
label := constructLabel(listMetric.cloudwatchMetric, statistic)
dataGranularityInSec := int32(dataGranularity.Seconds())
id := "cw" + strconv.Itoa(i) + "stats" + strconv.Itoa(j)
metricDataQuery := types.MetricDataQuery{
Id: &id,
MetricStat: &types.MetricStat{
Period: &dataGranularityInSec,
Stat: &stat,
Metric: &metric.Metric,
},
Label: &label,
}
if listMetric.cloudwatchMetric.AccountID != "" {
metricDataQuery.AccountId = &metric.AccountID
}
metricDataQueries = append(metricDataQueries, metricDataQuery)
}
}
return metricDataQueries
}
func constructLabel(metric aws.MetricWithID, statistic string) string {
// label = accountID + accountLabel + metricName + namespace + statistic + periodLabel + dimKeys + dimValues
label := strings.Join([]string{metric.AccountID, aws.LabelConst.AccountLabel, *metric.Metric.MetricName, *metric.Metric.Namespace, statistic, aws.LabelConst.PeriodLabel}, aws.LabelConst.LabelSeparator)
dimNames := ""
dimValues := ""
for i, dim := range metric.Metric.Dimensions {
dimNames += *dim.Name
dimValues += *dim.Value
if i != len(metric.Metric.Dimensions)-1 {
dimNames += dimensionSeparator
dimValues += dimensionSeparator
}
}
if dimNames != "" && dimValues != "" {
label += aws.LabelConst.LabelSeparator + dimNames
label += aws.LabelConst.LabelSeparator + dimValues
}
return label
}
func statisticLookup(stat string) (string, bool) {
statisticLookupTable := map[string]string{
"Average": "avg",
"Sum": "sum",
"Maximum": "max",
"Minimum": "min",
"SampleCount": "count",
}
statMethod, ok := statisticLookupTable[stat]
if !ok {
ok = strings.HasPrefix(stat, "p")
statMethod = stat
}
return statMethod, ok
}
func generateFieldName(namespace string, labels []string) string {
stat := labels[aws.LabelConst.StatisticIdx]
// Check if statistic method is one of Sum, SampleCount, Minimum, Maximum, Average
// With checkStatistics function, no need to check bool return value here
statMethod, _ := statisticLookup(stat)
// By default, replace dot "." using underscore "_" for metric names
return "aws." + stripNamespace(namespace) + ".metrics." + common.DeDot(labels[aws.LabelConst.MetricNameIdx]) + "." + statMethod
}
// stripNamespace converts Cloudwatch namespace into the root field we will use for metrics
// example AWS/EC2 -> ec2
func stripNamespace(namespace string) string {
parts := strings.Split(namespace, "/")
return strings.ToLower(parts[len(parts)-1])
}
func insertRootFields(event mb.Event, metricValue float64, labels []string) mb.Event {
namespace := labels[aws.LabelConst.NamespaceIdx]
_, _ = event.RootFields.Put(generateFieldName(namespace, labels), metricValue)
_, _ = event.RootFields.Put("aws.cloudwatch.namespace", namespace)
if len(labels) != aws.LabelConst.LabelLengthTotal {
return event
}
dimNames := strings.Split(labels[aws.LabelConst.IdentifierNameIdx], ",")
dimValues := strings.Split(labels[aws.LabelConst.IdentifierValueIdx], ",")
for i := 0; i < len(dimNames); i++ {
_, _ = event.RootFields.Put("aws.dimensions."+dimNames[i], dimValues[i])
}
return event
}
func (m *MetricSet) createEvents(svcCloudwatch cloudwatch.GetMetricDataAPIClient, svcResourceAPI resourcegroupstaggingapi.GetResourcesAPIClient, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, infoAPImap map[string]string, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, error) {
// Initialize events for each identifier.
events := make(map[string]mb.Event)
// Construct metricDataQueries
metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.DataGranularity)
m.logger.Debugf("Number of MetricDataQueries = %d", len(metricDataQueries))
if len(metricDataQueries) == 0 {
return events, nil
}
// Use metricDataQueries to make GetMetricData API calls
metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
m.logger.Debugf("Number of metricDataResults = %d", len(metricDataResults))
if err != nil {
return events, fmt.Errorf("getMetricDataResults failed: %w", err)
}
// Create events when there is no tags_filter or resource_type specified.
if len(resourceTypeTagFilters) == 0 {
for _, metricDataResult := range metricDataResults {
if len(metricDataResult.Values) == 0 {
continue
}
labels := strings.Split(*metricDataResult.Label, aws.LabelConst.LabelSeparator)
for valI, metricDataResultValue := range metricDataResult.Values {
if len(labels) != aws.LabelConst.LabelLengthTotal {
// when there is no identifier value in label, use id+label+region+accountID+namespace+index instead
identifier := labels[aws.LabelConst.AccountIdIdx] + labels[aws.LabelConst.AccountLabelIdx] + regionName + m.MonitoringAccountID + labels[aws.LabelConst.NamespaceIdx] + fmt.Sprint("-", valI)
if _, ok := events[identifier]; !ok {
if labels[aws.LabelConst.AccountIdIdx] != "" {
events[identifier] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
} else {
events[identifier] = aws.InitEvent(regionName, m.MonitoringAccountName, m.MonitoringAccountID, metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
}
}
events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
continue
}
identifierValue := labels[aws.LabelConst.IdentifierValueIdx] + fmt.Sprint("-", valI)
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
}
events[identifierValue] = insertRootFields(events[identifierValue], metricDataResultValue, labels)
}
}
return events, nil
}
// Create events with tags
for resourceType, tagsFilter := range resourceTypeTagFilters {
m.logger.Debugf("resourceType = %s", resourceType)
m.logger.Debugf("tagsFilter = %s", tagsFilter)
resourceTagMap, err := aws.GetResourcesTags(svcResourceAPI, []string{resourceType})
if err != nil {
// If GetResourcesTags failed, continue report event just without tags.
m.logger.Info(fmt.Errorf("getResourcesTags failed, skipping region %s: %w", regionName, err))
}
if len(tagsFilter) != 0 && len(resourceTagMap) == 0 {
continue
}
// filter resourceTagMap
for identifier, tags := range resourceTagMap {
if exists := aws.CheckTagFiltersExist(tagsFilter, tags); !exists {
m.logger.Debugf("In region %s, service %s tags does not match tags_filter", regionName, identifier)
delete(resourceTagMap, identifier)
continue
}
m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier)
}
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}
labels := strings.Split(*output.Label, aws.LabelConst.LabelSeparator)
for valI, metricDataResultValue := range output.Values {
if len(labels) != aws.LabelConst.LabelLengthTotal {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}
// when there is no identifier value in label, use id+label+region+accountID+namespace+index instead
identifier := labels[aws.LabelConst.AccountIdIdx] + labels[aws.LabelConst.AccountLabelIdx] + regionName + m.MonitoringAccountID + labels[aws.LabelConst.NamespaceIdx] + fmt.Sprint("-", valI)
if _, ok := events[identifier]; !ok {
if labels[aws.LabelConst.AccountIdIdx] != "" {
events[identifier] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
} else {
events[identifier] = aws.InitEvent(regionName, m.MonitoringAccountName, m.MonitoringAccountID, output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
}
}
events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
continue
}
identifierValue := labels[aws.LabelConst.IdentifierValueIdx]
uniqueIdentifierValue := identifierValue + fmt.Sprint("-", valI)
// add tags to event based on identifierValue
// Check if identifier includes dimensionSeparator (comma in this case),
// split the identifier and check for each sub-identifier.
// For example, identifier might be [storageType, s3BucketName].
// And tags are only store under s3BucketName in resourceTagMap.
subIdentifiers := strings.Split(identifierValue, dimensionSeparator)
for _, subIdentifier := range subIdentifiers {
if len(infoAPImap) > 0 { // If infoAPImap includes data
if valAPIName, ok := infoAPImap[subIdentifier]; ok {
subIdentifier = valAPIName
}
}
if _, ok := events[uniqueIdentifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[subIdentifier] == nil {
continue
}
events[uniqueIdentifierValue] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
}
events[uniqueIdentifierValue] = insertRootFields(events[uniqueIdentifierValue], metricDataResultValue, labels)
insertTags(events, uniqueIdentifierValue, subIdentifier, resourceTagMap)
}
}
}
}
return events, nil
}
func configDimensionValueContainsWildcard(dim []Dimension) bool {
for i := range dim {
if dim[i].Value == dimensionValueWildcard {
return true
}
}
return false
}
func compareAWSDimensions(dim1 []types.Dimension, dim2 []types.Dimension) bool {
if len(dim1) != len(dim2) {
return false
}
var dim1NameToValue = make(map[string]string, len(dim1))
var dim2NameToValue = make(map[string]string, len(dim1))
for i := range dim2 {
dim1NameToValue[*dim1[i].Name] = *dim1[i].Value
dim2NameToValue[*dim2[i].Name] = *dim2[i].Value
}
for name, v1 := range dim1NameToValue {
v2, exists := dim2NameToValue[name]
if exists && v2 == dimensionValueWildcard {
// wildcard can represent any value, so we set the
// dimension name with value in CloudWatch ListMetircs result,
// then the compare result is true
dim2NameToValue[name] = v1
}
}
return reflect.DeepEqual(dim1NameToValue, dim2NameToValue)
}
func insertTags(events map[string]mb.Event, uniqueIdentifierValue string, subIdentifier string, resourceTagMap map[string][]resourcegroupstaggingapitypes.Tag) {
tags := resourceTagMap[subIdentifier]
// some metric dimension values are arn format, eg: AWS/DDOS namespace metric
if len(tags) == 0 && strings.HasPrefix(subIdentifier, "arn:") {
resourceID, err := aws.FindShortIdentifierFromARN(subIdentifier)
if err == nil {
tags = resourceTagMap[resourceID]
}
}
if len(tags) != 0 {
// By default, replace dot "." using underscore "_" for tag keys.
// Note: tag values are not dedotted.
for _, tag := range tags {
_, _ = events[uniqueIdentifierValue].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), *tag.Value)
}
}
}