x-pack/metricbeat/module/aws/aws.go (290 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package aws
import (
"context"
"crypto/fips140"
"fmt"
"strconv"
"time"
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/aws-sdk-go-v2/service/iam"
rdstypes "github.com/aws/aws-sdk-go-v2/service/rds/types"
resourcegroupstaggingapitypes "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/elastic/beats/v7/metricbeat/mb"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/mapstr"
)
type describeRegionsClient interface {
DescribeRegions(ctx context.Context, params *ec2.DescribeRegionsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeRegionsOutput, error)
}
// Config defines all required and optional parameters for aws metricsets
type Config struct {
Period time.Duration `config:"period" validate:"nonzero,required"`
DataGranularity time.Duration `config:"data_granularity"`
Regions []string `config:"regions"`
Latency time.Duration `config:"latency"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
TagsFilter []Tag `config:"tags_filter"`
IncludeLinkedAccounts *bool `config:"include_linked_accounts"`
LimitRestAPI *int32 `config:"apigateway_max_results"`
OwningAccount string `config:"owning_account"`
}
// MetricSet is the base metricset for all aws metricsets
type MetricSet struct {
mb.BaseMetricSet
RegionsList []string
Endpoint string
Period time.Duration
DataGranularity time.Duration
Latency time.Duration
AwsConfig *awssdk.Config
MonitoringAccountName string
MonitoringAccountID string
TagsFilter []Tag
IncludeLinkedAccounts bool
OwningAccount string
}
// Tag holds a configuration specific for ec2 and cloudwatch metricset.
type Tag struct {
Key string `config:"key"`
Value []string `config:"value"`
}
// ModuleName is the name of this module.
const ModuleName = "aws"
// IncludeLinkedAccountsDefault defines if we should include metrics from linked AWS accounts or not. Default is true.
// More information about cross-account Cloudwatch monitoring can be found at
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Cross-Account-Cross-Region.html
const IncludeLinkedAccountsDefault = true
type LabelConstants struct {
AccountIdIdx int
AccountLabelIdx int
MetricNameIdx int
NamespaceIdx int
StatisticIdx int
PeriodLabelIdx int
IdentifierNameIdx int
IdentifierValueIdx int
LabelLengthTotal int
LabelSeparator string
AccountLabel string
PeriodLabel string
BillingDimensionStartIdx int
}
var LabelConst = LabelConstants{
AccountIdIdx: 0,
AccountLabelIdx: 1,
MetricNameIdx: 2,
NamespaceIdx: 3,
StatisticIdx: 4,
PeriodLabelIdx: 5,
IdentifierNameIdx: 6,
IdentifierValueIdx: 7,
LabelLengthTotal: 8,
LabelSeparator: "|",
AccountLabel: "${PROP('AccountLabel')}",
PeriodLabel: "${PROP('Period')}",
BillingDimensionStartIdx: 3,
}
const CloudWatchPeriodName = "aws.cloudwatch.period"
func init() {
if err := mb.Registry.AddModule(ModuleName, newModule); err != nil {
panic(err)
}
}
func newModule(base mb.BaseModule) (mb.Module, error) {
var config Config
if err := base.UnpackConfig(&config); err != nil {
return nil, err
}
return &base, nil
}
// NewMetricSet creates a base metricset for aws metricsets
func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
var config Config
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}
// Starting from Go 1.24, when FIPS 140-3 mode is active, fips140.Enabled() will return true.
// So, regardless of whether `fips_enabled` is set to true or false, when FIPS 140-3 mode is active, the
// resolver will resolve to the FIPS endpoint.
// See: https://go.dev/doc/security/fips140#fips-140-3-mode
if fips140.Enabled() {
config.AWSConfig.FIPSEnabled = true
}
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("failed to get aws credentials, please check AWS credential in config: %w", err)
}
ctx, cancel := getContextWithTimeout(DefaultApiTimeout)
defer cancel()
_, err = awsConfig.Credentials.Retrieve(ctx)
if err != nil {
return nil, fmt.Errorf("failed to retrieve aws credentials, please check AWS credential in config: %w", err)
}
base.Logger().Debug("aws config endpoint = ", config.AWSConfig.Endpoint)
if config.DataGranularity > config.Period {
return nil, fmt.Errorf("Data Granularity cannot be larger than the period")
}
if config.DataGranularity == 0 {
config.DataGranularity = config.Period
}
metricSet := MetricSet{
BaseMetricSet: base,
Period: config.Period,
DataGranularity: config.DataGranularity,
Latency: config.Latency,
AwsConfig: &awsConfig,
TagsFilter: config.TagsFilter,
Endpoint: config.AWSConfig.Endpoint,
}
metricSet.IncludeLinkedAccounts = IncludeLinkedAccountsDefault
if config.IncludeLinkedAccounts != nil {
metricSet.IncludeLinkedAccounts = *config.IncludeLinkedAccounts
}
// IncludeLinkedAccounts & OwningAccount properties are connected.
// OwningAccount cannot be set if IncludeLinkedAccounts is set to false
if !metricSet.IncludeLinkedAccounts && config.OwningAccount != "" {
return nil, fmt.Errorf("include_linked_accounts must be `true` when specifying non-empty owning_account, please correct configurations and try again")
}
base.Logger().Debug("Metricset level config for period: ", metricSet.Period)
base.Logger().Debug("Metricset level config for data granularity: ", metricSet.DataGranularity)
base.Logger().Debug("Metricset level config for tags filter: ", metricSet.TagsFilter)
base.Logger().Debug("Metricset level config for including linked accounts: ", metricSet.IncludeLinkedAccounts)
if config.OwningAccount != "" {
base.Logger().Debug("Metricset level config for OwningAccount: ", metricSet.OwningAccount)
metricSet.OwningAccount = config.OwningAccount
}
base.Logger().Warn("extra charges on AWS API requests will be generated by this metricset")
// If regions in config is not empty, then overwrite the awsConfig.Region
if len(config.Regions) > 0 {
awsConfig.Region = config.Regions[0]
}
// Get IAM account id
svcSts := sts.NewFromConfig(awsConfig, func(o *sts.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
ctx, cancel = getContextWithTimeout(DefaultApiTimeout)
defer cancel()
outputIdentity, err := svcSts.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{})
if err != nil {
base.Logger().Warn("failed to get caller identity, please check permission setting: ", err)
} else {
metricSet.MonitoringAccountID = *outputIdentity.Account
base.Logger().Debug("AWS Credentials belong to monitoring account ID: ", metricSet.MonitoringAccountID)
}
// Get account name/alias
svcIam := iam.NewFromConfig(awsConfig, func(o *iam.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
metricSet.MonitoringAccountName = getAccountName(svcIam, base, metricSet)
//Validate LimitRestAPI value.
//The Limit variable defines maximum number of returned results per page. The default value is 25 and the maximum value is 500.
if config.LimitRestAPI != nil {
if *config.LimitRestAPI > 500 {
base.Logger().Debug("apigateway_max_results config value can not exceed value 500. Setting apigateway_max_results=500")
*config.LimitRestAPI = 500
} else if *config.LimitRestAPI <= 0 {
base.Logger().Debug("apigateway_max_results config value can not be <=0. Setting apigateway_max_results=25")
*config.LimitRestAPI = 25
}
}
// Construct MetricSet with a full regions list
if config.Regions == nil {
svcEC2 := ec2.NewFromConfig(awsConfig, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})
completeRegionsList, err := getRegions(svcEC2)
if err != nil {
return nil, err
}
metricSet.RegionsList = completeRegionsList
base.Logger().Debug("Metricset level config for regions: ", metricSet.RegionsList)
return &metricSet, nil
}
// Construct MetricSet with specific regions list from config
metricSet.RegionsList = config.Regions
base.Logger().Debug("Metricset level config for regions: ", metricSet.RegionsList)
return &metricSet, nil
}
func getRegions(svc describeRegionsClient) ([]string, error) {
completeRegionsList := make([]string, 0)
input := &ec2.DescribeRegionsInput{}
output, err := svc.DescribeRegions(context.TODO(), input)
if err != nil {
err = fmt.Errorf("failed DescribeRegions: %w", err)
return completeRegionsList, err
}
for _, region := range output.Regions {
completeRegionsList = append(completeRegionsList, *region.RegionName)
}
return completeRegionsList, err
}
func getAccountName(svc *iam.Client, base mb.BaseMetricSet, metricSet MetricSet) string {
ctx, cancel := getContextWithTimeout(DefaultApiTimeout)
defer cancel()
output, err := svc.ListAccountAliases(ctx, &iam.ListAccountAliasesInput{})
accountName := metricSet.MonitoringAccountID
if err != nil {
base.Logger().Warn("failed to list account aliases, please check permission setting: ", err)
return accountName
}
// When there is no account alias, account ID will be used as cloud.account.name
if len(output.AccountAliases) == 0 {
accountName = metricSet.MonitoringAccountID
base.Logger().Debug("AWS Credentials belong to account ID: ", metricSet.MonitoringAccountID)
return accountName
}
// There can be more than one aliases for each account, for now we are only
// collecting the first one.
accountName = output.AccountAliases[0]
base.Logger().Debug("AWS Credentials belong to account name: ", metricSet.MonitoringAccountName)
return accountName
}
// StringInSlice checks if a string is already exists in list and its location
func StringInSlice(str string, list []string) (bool, int) {
for idx, v := range list {
if v == str {
return true, idx
}
}
// If this string doesn't exist in given list, then return location to be -1
return false, -1
}
// InitEvent initialize mb.Event with basic information like service.name, cloud.provider
func InitEvent(regionName string, accountName string, accountID string, timestamp time.Time, periodLabel string) mb.Event {
event := mb.Event{
Timestamp: timestamp,
MetricSetFields: mapstr.M{},
ModuleFields: mapstr.M{},
RootFields: mapstr.M{},
}
period, err := strconv.Atoi(periodLabel)
if err == nil {
_, _ = event.RootFields.Put(CloudWatchPeriodName, period)
}
_, _ = event.RootFields.Put("cloud.provider", "aws")
if regionName != "" {
_, _ = event.RootFields.Put("cloud.region", regionName)
}
if accountName != "" {
_, _ = event.RootFields.Put("cloud.account.name", accountName)
}
if accountID != "" {
_, _ = event.RootFields.Put("cloud.account.id", accountID)
}
return event
}
// CheckTagFiltersExist compare tags filter with a set of tags to see if tags
// filter is a subset of tags
func CheckTagFiltersExist(tagsFilter []Tag, tags interface{}) bool {
var tagKeys []string
var tagValues []string
switch tags := tags.(type) {
case []resourcegroupstaggingapitypes.Tag:
for _, tag := range tags {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
case []ec2types.Tag:
for _, tag := range tags {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
case []rdstypes.Tag:
for _, tag := range tags {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
}
for _, tagFilter := range tagsFilter {
if exists, idx := StringInSlice(tagFilter.Key, tagKeys); exists {
valueExists, _ := StringInSlice(tagValues[idx], tagFilter.Value)
if !valueExists {
return false
}
} else {
return false
}
}
return true
}