in plugins/processors/awsentity/processor.go [142:325]
func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
// Get the following metric attributes from the EntityStore: PlatformType, EC2.InstanceId, EC2.AutoScalingGroup
rm := md.ResourceMetrics()
for i := 0; i < rm.Len(); i++ {
var logGroupNames, serviceName, environmentName string
var entityServiceNameSource, entityPlatformType string
var ec2Info entitystore.EC2Info
resourceAttrs := rm.At(i).Resource().Attributes()
switch p.config.EntityType {
case entityattributes.Resource:
if p.config.KubernetesMode != "" {
switch p.config.KubernetesMode {
case config.ModeEKS:
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityEKSPlatform)
default:
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityK8sPlatform)
}
} else if p.config.Platform == config.ModeEC2 {
// ec2tagger processor may have picked up the ASG name from an ec2:DescribeTags call
if getAutoScalingGroupFromEntityStore() == EMPTY && p.config.ScrapeDatapointAttribute {
if autoScalingGroup := p.scrapeResourceEntityAttribute(rm.At(i).ScopeMetrics()); autoScalingGroup != EMPTY {
setAutoScalingGroup(autoScalingGroup)
}
}
ec2Info = getEC2InfoFromEntityStore()
if ec2Info.GetInstanceID() != EMPTY {
resourceAttrs.PutStr(entityattributes.AttributeEntityType, entityattributes.AttributeEntityAWSResource)
resourceAttrs.PutStr(entityattributes.AttributeEntityResourceType, entityattributes.AttributeEntityEC2InstanceResource)
resourceAttrs.PutStr(entityattributes.AttributeEntityIdentifier, ec2Info.GetInstanceID())
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
}
case entityattributes.Service:
if logGroupNamesAttr, ok := resourceAttrs.Get(attributeAwsLogGroupNames); ok {
logGroupNames = logGroupNamesAttr.Str()
}
if serviceNameAttr, ok := resourceAttrs.Get(attributeServiceName); ok {
serviceName = serviceNameAttr.Str()
}
if environmentNameAttr, ok := resourceAttrs.Get(attributeDeploymentEnvironment); ok {
environmentName = environmentNameAttr.Str()
}
if serviceNameSource, sourceExists := resourceAttrs.Get(entityattributes.AttributeEntityServiceNameSource); sourceExists {
entityServiceNameSource = serviceNameSource.Str()
}
// resourcedetection processor may have picked up the ASG name from an ec2:DescribeTags call
if autoScalingGroupNameAttr, ok := resourceAttrs.Get(attributeEC2TagAwsAutoscalingGroupName); ok {
setAutoScalingGroup(autoScalingGroupNameAttr.Str())
}
entityServiceName := getServiceAttributes(resourceAttrs)
entityEnvironmentName := environmentName
if (entityServiceName == EMPTY || entityEnvironmentName == EMPTY) && p.config.ScrapeDatapointAttribute {
entityServiceName, entityEnvironmentName, entityServiceNameSource = p.scrapeServiceAttribute(rm.At(i).ScopeMetrics())
// If the entityServiceNameSource is empty here, that means it was not configured via instrumentation
// If entityServiceName is a datapoint attribute, that means the service name is coming from the UserConfiguration source
if entityServiceNameSource == entityattributes.AttributeServiceNameSourceUserConfig && entityServiceName != EMPTY {
entityServiceNameSource = entityattributes.AttributeServiceNameSourceUserConfig
}
}
if p.config.KubernetesMode != "" {
p.k8sscraper.Scrape(rm.At(i).Resource(), getPodMeta(ctx))
if p.config.Platform == config.ModeEC2 {
ec2Info = getEC2InfoFromEntityStore()
}
if p.config.KubernetesMode == config.ModeEKS {
entityPlatformType = entityattributes.AttributeEntityEKSPlatform
} else {
entityPlatformType = entityattributes.AttributeEntityK8sPlatform
}
podInfo, ok := p.k8sscraper.(*k8sattributescraper.K8sAttributeScraper)
// Perform fallback mechanism for service name if it is empty
// or has prefix unknown_service ( unknown_service will be set by OTEL SDK if the service name is empty on application pod)
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
if (entityServiceName == EMPTY || strings.HasPrefix(entityServiceName, unknownService)) && ok && podInfo != nil && podInfo.Workload != EMPTY {
entityServiceName = podInfo.Workload
entityServiceNameSource = entitystore.ServiceNameSourceK8sWorkload
}
// Set the service name source to Instrumentation if the operator doesn't set it
if entityServiceName != EMPTY && entityServiceNameSource == EMPTY && getTelemetrySDKEnabledAttribute(resourceAttrs) {
entityServiceNameSource = entitystore.ServiceNameSourceInstrumentation
}
// Perform fallback mechanism for environment if it is empty
if entityEnvironmentName == EMPTY && ok && podInfo.Cluster != EMPTY && podInfo.Namespace != EMPTY {
if p.config.KubernetesMode == config.ModeEKS {
entityEnvironmentName = "eks:" + p.config.ClusterName + "/" + podInfo.Namespace
} else if p.config.KubernetesMode == config.ModeK8sEC2 || p.config.KubernetesMode == config.ModeK8sOnPrem {
entityEnvironmentName = "k8s:" + p.config.ClusterName + "/" + podInfo.Namespace
}
}
// Add service information for a pod to the pod association map
// so that agent can host this information in a server
fullPodName := scrapeK8sPodName(resourceAttrs)
if fullPodName != EMPTY && entityServiceName != EMPTY && entityServiceNameSource != EMPTY {
addPodToServiceEnvironmentMap(fullPodName, entityServiceName, entityEnvironmentName, entityServiceNameSource)
} else if fullPodName != EMPTY && entityServiceName != EMPTY && entityServiceNameSource == EMPTY {
addPodToServiceEnvironmentMap(fullPodName, entityServiceName, entityEnvironmentName, entitystore.ServiceNameSourceUnknown)
}
eksAttributes := K8sServiceAttributes{
Cluster: podInfo.Cluster,
Namespace: podInfo.Namespace,
Workload: podInfo.Workload,
Node: podInfo.Node,
InstanceId: ec2Info.GetInstanceID(),
ServiceNameSource: entityServiceNameSource,
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
if err := validate.Struct(eksAttributes); err == nil {
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
resourceAttrs.PutStr(entityattributes.AttributeEntityCluster, eksAttributes.Cluster)
resourceAttrs.PutStr(entityattributes.AttributeEntityNamespace, eksAttributes.Namespace)
resourceAttrs.PutStr(entityattributes.AttributeEntityWorkload, eksAttributes.Workload)
resourceAttrs.PutStr(entityattributes.AttributeEntityNode, eksAttributes.Node)
//Add Instance id attribute only if the application node is same as agent node
if eksAttributes.Node == os.Getenv("K8S_NODE_NAME") {
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, eksAttributes.InstanceId)
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, entityServiceNameSource)
}
p.k8sscraper.Reset()
} else if p.config.Platform == config.ModeEC2 {
//If entityServiceNameSource is empty, it was not configured via the config. Get the source in descending priority
// 1. Incoming telemetry attributes
// 2. CWA config
// 3. instance tags - The tags attached to the EC2 instance. Only scrape for tag with the following key: service, application, app
// 4. IAM Role - The IAM role name retrieved through IMDS(Instance Metadata Service)
if entityServiceName == EMPTY && entityServiceNameSource == EMPTY {
entityServiceName, entityServiceNameSource = getServiceNameSource()
} else if entityServiceName != EMPTY && entityServiceNameSource == EMPTY {
entityServiceNameSource = entitystore.ServiceNameSourceUnknown
}
entityPlatformType = entityattributes.AttributeEntityEC2Platform
ec2Info = getEC2InfoFromEntityStore()
if entityEnvironmentName == EMPTY {
if getAutoScalingGroupFromEntityStore() != EMPTY {
entityEnvironmentName = entityattributes.DeploymentEnvironmentFallbackPrefix + getAutoScalingGroupFromEntityStore()
} else {
entityEnvironmentName = entityattributes.DeploymentEnvironmentDefault
}
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
ec2Attributes := EC2ServiceAttributes{
InstanceId: ec2Info.GetInstanceID(),
AutoScalingGroup: getAutoScalingGroupFromEntityStore(),
ServiceNameSource: entityServiceNameSource,
}
if err := validate.Struct(ec2Attributes); err == nil {
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, ec2Attributes.InstanceId)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAutoScalingGroup, ec2Attributes.AutoScalingGroup)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, ec2Attributes.ServiceNameSource)
}
}
if logGroupNames == EMPTY || (serviceName == EMPTY && environmentName == EMPTY) {
continue
}
logGroupNamesSlice := strings.Split(logGroupNames, "&")
for _, logGroupName := range logGroupNamesSlice {
if logGroupName == EMPTY {
continue
}
addToEntityStore(entitystore.LogGroupName(logGroupName), serviceName, environmentName)
}
}
}
return md, nil
}