extension/entitystore/serviceprovider.go (260 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package entitystore
import (
"context"
"strings"
"sync"
"go.uber.org/zap"
configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)
const (
SERVICE = "service"
APPLICATION = "application"
APP = "app"
// Matches the default value from OTel
// https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_service_name
ServiceNameUnknown = "unknown_service"
ServiceNameSourceClientIamRole = "ClientIamRole"
ServiceNameSourceInstrumentation = "Instrumentation"
ServiceNameSourceResourceTags = "ResourceTags"
ServiceNameSourceUnknown = "Unknown"
ServiceNameSourceUserConfiguration = "UserConfiguration"
ServiceNameSourceK8sWorkload = "K8sWorkload"
describeTagsJitterMax = 3600
describeTagsJitterMin = 3000
defaultJitterMin = 480
defaultJitterMax = 600
maxRetry = 3
)
var (
//serviceProviderPriorities is ranking in how we prioritize which IMDS tag determines the service name
serviceProviderPriorities = []string{SERVICE, APPLICATION, APP}
)
type ServiceAttribute struct {
ServiceName string
ServiceNameSource string
Environment string
}
type LogGroupName string
type LogFileGlob string
type autoscalinggroup struct {
name string
once sync.Once
}
type serviceprovider struct {
mode string
ec2Info *EC2Info
metadataProvider ec2metadataprovider.MetadataProvider
iamRole string
imdsServiceName string
autoScalingGroup autoscalinggroup
region string
done chan struct{}
logger *zap.Logger
mutex sync.RWMutex
logMutex sync.RWMutex
// logFiles stores the service attributes that were configured for log files in CloudWatch Agent configuration.
// Example:
// "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log": {ServiceName: "cloudwatch-agent"}
logFiles map[LogFileGlob]ServiceAttribute
// logGroups stores the associations between log groups and service attributes that were observed from incoming
// telemetry. Example:
// "MyLogGroup": {ServiceName: "MyInstrumentedService"}
logGroups map[LogGroupName]ServiceAttribute
}
func (s *serviceprovider) startServiceProvider() {
if s.metadataProvider == nil {
return
}
unlimitedRetryer := NewRetryer(false, true, defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry, s.done, s.logger)
unlimitedRetryerUntilSuccess := NewRetryer(true, true, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.BackoffSleepArray, infRetry, s.done, s.logger)
go unlimitedRetryer.refreshLoop(s.scrapeIAMRole)
go unlimitedRetryerUntilSuccess.refreshLoop(s.scrapeImdsServiceNameAndASG)
}
func (s *serviceprovider) GetIAMRole() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.iamRole
}
func (s *serviceprovider) GetIMDSServiceName() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.imdsServiceName
}
func (s *serviceprovider) getAutoScalingGroup() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.autoScalingGroup.name
}
func (s *serviceprovider) setAutoScalingGroup(asg string) {
s.autoScalingGroup.once.Do(func() {
s.mutex.Lock()
defer s.mutex.Unlock()
if asgLength := len(asg); asgLength > autoScalingGroupSizeMax {
s.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
s.autoScalingGroup.name = ""
} else {
s.autoScalingGroup.name = asg
}
})
}
// addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the
// CloudWatch Agent config.
func (s *serviceprovider) addEntryForLogFile(logFileGlob LogFileGlob, serviceAttr ServiceAttribute) {
s.logMutex.Lock()
defer s.logMutex.Unlock()
if s.logFiles == nil {
s.logFiles = make(map[LogFileGlob]ServiceAttribute)
}
s.logFiles[logFileGlob] = serviceAttr
}
// addEntryForLogGroup adds an association between a log group name and a service attribute, as observed from incoming
// telemetry received by CloudWatch Agent.
func (s *serviceprovider) addEntryForLogGroup(logGroupName LogGroupName, serviceAttr ServiceAttribute) {
s.logMutex.Lock()
defer s.logMutex.Unlock()
if s.logGroups == nil {
s.logGroups = make(map[LogGroupName]ServiceAttribute)
}
s.logGroups[logGroupName] = serviceAttr
}
type serviceAttributeProvider func() ServiceAttribute
// mergeServiceAttributes takes in a list of functions that create ServiceAttributes, in descending priority order
// (highest priority first), and proceeds down the list until we have obtained both a ServiceName and an
// EnvironmentName.
func mergeServiceAttributes(providers []serviceAttributeProvider) ServiceAttribute {
ret := ServiceAttribute{}
for _, provider := range providers {
serviceAttr := provider()
if ret.ServiceName == "" {
ret.ServiceName = serviceAttr.ServiceName
ret.ServiceNameSource = serviceAttr.ServiceNameSource
}
if ret.Environment == "" {
ret.Environment = serviceAttr.Environment
}
if ret.ServiceName != "" && ret.Environment != "" {
return ret
}
}
return ret
}
// logFileServiceAttribute function gets the relevant service attributes
// service name is retrieved based on the following priority chain
// 1. Incoming telemetry attributes
// 2. CWA config
// 3. instance tags - The tags attached to the EC2 instance. Only scrape for tag with the following key: service, application, app
// 4. IAM Role - The IAM role name retrieved through IMDS(Instance Metadata Service)
func (s *serviceprovider) logFileServiceAttribute(logFile LogFileGlob, logGroup LogGroupName) ServiceAttribute {
return mergeServiceAttributes([]serviceAttributeProvider{
func() ServiceAttribute { return s.serviceAttributeForLogGroup(logGroup) },
func() ServiceAttribute { return s.serviceAttributeForLogFile(logFile) },
s.serviceAttributeFromImdsTags,
s.serviceAttributeFromIamRole,
s.serviceAttributeFromAsg,
s.serviceAttributeFallback,
})
}
func (s *serviceprovider) getServiceNameAndSource() (string, string) {
sa := mergeServiceAttributes([]serviceAttributeProvider{
s.serviceAttributeFromImdsTags,
s.serviceAttributeFromIamRole,
s.serviceAttributeFallback,
})
return sa.ServiceName, sa.ServiceNameSource
}
func (s *serviceprovider) serviceAttributeForLogGroup(logGroup LogGroupName) ServiceAttribute {
if logGroup == "" || s.logGroups == nil {
return ServiceAttribute{}
}
s.logMutex.RLock()
defer s.logMutex.RUnlock()
return s.logGroups[logGroup]
}
func (s *serviceprovider) serviceAttributeForLogFile(logFile LogFileGlob) ServiceAttribute {
if logFile == "" || s.logFiles == nil {
return ServiceAttribute{}
}
s.logMutex.RLock()
defer s.logMutex.RUnlock()
return s.logFiles[logFile]
}
func (s *serviceprovider) serviceAttributeFromImdsTags() ServiceAttribute {
if s.GetIMDSServiceName() == "" {
return ServiceAttribute{}
}
return ServiceAttribute{
ServiceName: s.GetIMDSServiceName(),
ServiceNameSource: ServiceNameSourceResourceTags,
}
}
func (s *serviceprovider) serviceAttributeFromIamRole() ServiceAttribute {
if s.GetIAMRole() == "" {
return ServiceAttribute{}
}
return ServiceAttribute{
ServiceName: s.GetIAMRole(),
ServiceNameSource: ServiceNameSourceClientIamRole,
}
}
func (s *serviceprovider) serviceAttributeFromAsg() ServiceAttribute {
if s.getAutoScalingGroup() == "" {
return ServiceAttribute{}
}
return ServiceAttribute{
Environment: "ec2:" + s.getAutoScalingGroup(),
}
}
func (s *serviceprovider) serviceAttributeFallback() ServiceAttribute {
attr := ServiceAttribute{
ServiceName: ServiceNameUnknown,
ServiceNameSource: ServiceNameSourceUnknown,
}
if s.mode == config.ModeEC2 {
attr.Environment = "ec2:default"
}
return attr
}
func (s *serviceprovider) scrapeIAMRole() error {
iamRole, err := s.metadataProvider.ClientIAMRole(context.Background())
if err != nil {
return err
}
s.mutex.Lock()
s.iamRole = iamRole
s.mutex.Unlock()
return nil
}
func (s *serviceprovider) scrapeImdsServiceNameAndASG() error {
tagKeys, err := s.metadataProvider.InstanceTags(context.Background())
if err != nil {
s.logger.Debug("Failed to get service name from instance tags. This is likely because instance tag is not enabled for IMDS but will not affect agent functionality.")
return err
}
// This will check whether the tags contains SERVICE, APPLICATION, APP, in that order (case insensitive)
lowerTagKeys := toLowerKeyMap(tagKeys)
for _, potentialServiceProviderKey := range serviceProviderPriorities {
if originalCaseKey, exists := lowerTagKeys[potentialServiceProviderKey]; exists {
serviceName, err := s.metadataProvider.InstanceTagValue(context.Background(), originalCaseKey)
if err != nil {
continue
}
s.mutex.Lock()
s.imdsServiceName = serviceName
s.mutex.Unlock()
break
}
}
// case sensitive
if originalCaseKey := lowerTagKeys[strings.ToLower(ec2tagger.Ec2InstanceTagKeyASG)]; originalCaseKey == ec2tagger.Ec2InstanceTagKeyASG {
asg, err := s.metadataProvider.InstanceTagValue(context.Background(), ec2tagger.Ec2InstanceTagKeyASG)
if err == nil && asg != "" {
s.logger.Debug("AutoScalingGroup retrieved through IMDS")
s.setAutoScalingGroup(asg)
}
}
if s.GetIMDSServiceName() == "" {
s.logger.Debug("Service name not found through IMDS")
}
if s.getAutoScalingGroup() == "" {
s.logger.Debug("AutoScalingGroup name not found through IMDS")
}
return nil
}
func toLowerKeyMap(values []string) map[string]string {
set := make(map[string]string, len(values))
for _, v := range values {
set[strings.ToLower(v)] = v
}
return set
}
func newServiceProvider(mode string, region string, ec2Info *EC2Info, metadataProvider ec2metadataprovider.MetadataProvider, providerType ec2ProviderType, ec2Credential *configaws.CredentialConfig, done chan struct{}, logger *zap.Logger) serviceProviderInterface {
return &serviceprovider{
mode: mode,
region: region,
ec2Info: ec2Info,
metadataProvider: metadataProvider,
done: done,
logger: logger,
logFiles: make(map[LogFileGlob]ServiceAttribute),
logGroups: make(map[LogGroupName]ServiceAttribute),
}
}