extension/entitystore/extension.go (217 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package entitystore
import (
"context"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.uber.org/atomic"
"go.uber.org/zap"
configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/entityattributes"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)
const (
Service = "Service"
InstanceIDKey = "EC2.InstanceId"
ASGKey = "EC2.AutoScalingGroup"
ServiceNameSourceKey = "AWS.ServiceNameSource"
PlatformType = "PlatformType"
EC2PlatForm = "AWS::EC2"
podTerminationCheckInterval = 5 * time.Minute
)
type ec2ProviderType func(string, *configaws.CredentialConfig) ec2iface.EC2API
type serviceProviderInterface interface {
startServiceProvider()
addEntryForLogFile(LogFileGlob, ServiceAttribute)
addEntryForLogGroup(LogGroupName, ServiceAttribute)
logFileServiceAttribute(LogFileGlob, LogGroupName) ServiceAttribute
getServiceNameAndSource() (string, string)
getAutoScalingGroup() string
setAutoScalingGroup(string)
}
type EntityStore struct {
logger *zap.Logger
config *Config
done chan struct{}
ready atomic.Bool
// mode should be EC2, ECS, EKS, and K8S
mode string
kubernetesMode string
// ec2Info stores information about EC2 instances such as instance ID and
// auto scaling groups
ec2Info EC2Info
// eksInfo stores information about EKS such as pod to service Env map
eksInfo *eksInfo
// serviceprovider stores information about possible service names
// that we can attach to the entity
serviceprovider serviceProviderInterface
// nativeCredential stores the credential config for agent's native
// component such as LogAgent
nativeCredential client.ConfigProvider
metadataprovider ec2metadataprovider.MetadataProvider
podTerminationCheckInterval time.Duration
}
var _ extension.Extension = (*EntityStore)(nil)
func (e *EntityStore) Start(ctx context.Context, host component.Host) error {
// Get IMDS client and EC2 API client which requires region for authentication
// These will be passed down to any object that requires access to IMDS or EC2
// API client so we have single source of truth for credential
e.done = make(chan struct{})
e.metadataprovider = getMetaDataProvider()
e.mode = e.config.Mode
e.kubernetesMode = e.config.KubernetesMode
e.podTerminationCheckInterval = podTerminationCheckInterval
ec2CredentialConfig := &configaws.CredentialConfig{
Profile: e.config.Profile,
Filename: e.config.Filename,
}
e.serviceprovider = newServiceProvider(e.mode, e.config.Region, &e.ec2Info, e.metadataprovider, getEC2Provider, ec2CredentialConfig, e.done, e.logger)
switch e.mode {
case config.ModeEC2:
e.ec2Info = *newEC2Info(e.metadataprovider, e.done, e.config.Region, e.logger)
go e.ec2Info.initEc2Info()
// Instance metadata tags is not usable for EKS nodes
// https://github.com/kubernetes/cloud-provider-aws/issues/762
if e.kubernetesMode == "" {
go e.serviceprovider.startServiceProvider()
}
}
if e.kubernetesMode != "" {
e.eksInfo = newEKSInfo(e.logger)
// Starting the ttl cache will automatically evict all expired pods from the map
go e.StartPodToServiceEnvironmentMappingTtlCache()
}
e.ready.Store(true)
return nil
}
func (e *EntityStore) Shutdown(_ context.Context) error {
close(e.done)
if e.eksInfo != nil && e.eksInfo.podToServiceEnvMap != nil {
e.eksInfo.podToServiceEnvMap.Stop()
}
e.logger.Info("Pod to Service Environment Mapping TTL Cache stopped")
return nil
}
func (e *EntityStore) Mode() string {
return e.mode
}
func (e *EntityStore) KubernetesMode() string {
return e.kubernetesMode
}
func (e *EntityStore) EKSInfo() *eksInfo {
return e.eksInfo
}
func (e *EntityStore) EC2Info() EC2Info {
return e.ec2Info
}
func (e *EntityStore) SetNativeCredential(client client.ConfigProvider) {
e.nativeCredential = client
}
func (e *EntityStore) NativeCredentialExists() bool {
return e.nativeCredential != nil
}
// CreateLogFileEntity creates the entity for log events that are being uploaded from a log file in the environment.
func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName LogGroupName) *cloudwatchlogs.Entity {
if e.serviceprovider == nil {
return nil
}
serviceAttr := e.serviceprovider.logFileServiceAttribute(logFileGlob, logGroupName)
keyAttributes := e.createServiceKeyAttributes(serviceAttr)
attributeMap := e.createAttributeMap()
addNonEmptyToMap(attributeMap, ServiceNameSourceKey, serviceAttr.ServiceNameSource)
if _, ok := keyAttributes[entityattributes.AwsAccountId]; !ok {
return nil
}
return &cloudwatchlogs.Entity{
KeyAttributes: keyAttributes,
Attributes: attributeMap,
}
}
// GetMetricServiceNameAndSource gets the service name source for service metrics if not customer provided
func (e *EntityStore) GetMetricServiceNameAndSource() (string, string) {
if e.serviceprovider == nil {
return "", ""
}
return e.serviceprovider.getServiceNameAndSource()
}
// GetServiceMetricAttributesMap creates the attribute map for service metrics. This will be expanded upon in a later PR'S,
// but for now is just covering the EC2 attributes for service metrics.
func (e *EntityStore) GetServiceMetricAttributesMap() map[string]*string {
return e.createAttributeMap()
}
func (e *EntityStore) GetAutoScalingGroup() string {
if e.serviceprovider == nil {
return ""
}
return e.serviceprovider.getAutoScalingGroup()
}
func (e *EntityStore) SetAutoScalingGroup(asg string) {
if e.serviceprovider != nil {
e.serviceprovider.setAutoScalingGroup(asg)
}
}
// AddServiceAttrEntryForLogFile adds an entry to the entity store for the provided file glob -> (serviceName, environmentName) key-value pair
func (e *EntityStore) AddServiceAttrEntryForLogFile(fileGlob LogFileGlob, serviceName string, environmentName string) {
if e.serviceprovider != nil {
e.serviceprovider.addEntryForLogFile(fileGlob, ServiceAttribute{
ServiceName: serviceName,
ServiceNameSource: ServiceNameSourceUserConfiguration,
Environment: environmentName,
})
}
}
// AddServiceAttrEntryForLogGroup adds an entry to the entity store for the provided log group nme -> (serviceName, environmentName) key-value pair
func (e *EntityStore) AddServiceAttrEntryForLogGroup(logGroupName LogGroupName, serviceName string, environmentName string) {
if e.serviceprovider != nil {
e.serviceprovider.addEntryForLogGroup(logGroupName, ServiceAttribute{
ServiceName: serviceName,
ServiceNameSource: ServiceNameSourceInstrumentation,
Environment: environmentName,
})
}
}
func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceName string, environmentName string, serviceNameSource string) {
if e.eksInfo != nil {
e.eksInfo.AddPodServiceEnvironmentMapping(podName, serviceName, environmentName, serviceNameSource)
}
}
func (e *EntityStore) StartPodToServiceEnvironmentMappingTtlCache() {
if e.eksInfo != nil && e.eksInfo.GetPodServiceEnvironmentMapping() != nil {
e.eksInfo.GetPodServiceEnvironmentMapping().Start()
}
}
func (e *EntityStore) GetPodServiceEnvironmentMapping() *ttlcache.Cache[string, ServiceEnvironment] {
if e.eksInfo != nil {
return e.eksInfo.GetPodServiceEnvironmentMapping()
}
return ttlcache.New[string, ServiceEnvironment](
ttlcache.WithTTL[string, ServiceEnvironment](ttlDuration),
)
}
func (e *EntityStore) createAttributeMap() map[string]*string {
attributeMap := make(map[string]*string)
if e.mode == config.ModeEC2 {
addNonEmptyToMap(attributeMap, InstanceIDKey, e.ec2Info.GetInstanceID())
addNonEmptyToMap(attributeMap, ASGKey, e.GetAutoScalingGroup())
}
switch e.mode {
case config.ModeEC2:
attributeMap[PlatformType] = aws.String(EC2PlatForm)
}
return attributeMap
}
// createServiceKeyAttribute creates KeyAttributes for Service entities
func (e *EntityStore) createServiceKeyAttributes(serviceAttr ServiceAttribute) map[string]*string {
serviceKeyAttr := map[string]*string{
entityattributes.EntityType: aws.String(Service),
}
addNonEmptyToMap(serviceKeyAttr, entityattributes.ServiceName, serviceAttr.ServiceName)
addNonEmptyToMap(serviceKeyAttr, entityattributes.DeploymentEnvironment, serviceAttr.Environment)
addNonEmptyToMap(serviceKeyAttr, entityattributes.AwsAccountId, e.ec2Info.GetAccountID())
return serviceKeyAttr
}
var getMetaDataProvider = func() ec2metadataprovider.MetadataProvider {
mdCredentialConfig := &configaws.CredentialConfig{}
return ec2metadataprovider.NewMetadataProvider(mdCredentialConfig.Credentials(), retryer.GetDefaultRetryNumber())
}
var getEC2Provider = func(region string, ec2CredentialConfig *configaws.CredentialConfig) ec2iface.EC2API {
ec2CredentialConfig.Region = region
return ec2.New(
ec2CredentialConfig.Credentials(),
&aws.Config{
LogLevel: configaws.SDKLogLevel(),
Logger: configaws.SDKLogger{},
})
}
func addNonEmptyToMap(m map[string]*string, key, value string) {
if value != "" {
m[key] = aws.String(value)
}
}