func()

in plugins/processors/awsentity/processor.go [142:325]


func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
	// Get the following metric attributes from the EntityStore: PlatformType, EC2.InstanceId, EC2.AutoScalingGroup

	rm := md.ResourceMetrics()
	for i := 0; i < rm.Len(); i++ {
		var logGroupNames, serviceName, environmentName string
		var entityServiceNameSource, entityPlatformType string
		var ec2Info entitystore.EC2Info
		resourceAttrs := rm.At(i).Resource().Attributes()
		switch p.config.EntityType {
		case entityattributes.Resource:
			if p.config.KubernetesMode != "" {
				switch p.config.KubernetesMode {
				case config.ModeEKS:
					resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityEKSPlatform)
				default:
					resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityK8sPlatform)
				}
			} else if p.config.Platform == config.ModeEC2 {
				// ec2tagger processor may have picked up the ASG name from an ec2:DescribeTags call
				if getAutoScalingGroupFromEntityStore() == EMPTY && p.config.ScrapeDatapointAttribute {
					if autoScalingGroup := p.scrapeResourceEntityAttribute(rm.At(i).ScopeMetrics()); autoScalingGroup != EMPTY {
						setAutoScalingGroup(autoScalingGroup)
					}
				}
				ec2Info = getEC2InfoFromEntityStore()
				if ec2Info.GetInstanceID() != EMPTY {
					resourceAttrs.PutStr(entityattributes.AttributeEntityType, entityattributes.AttributeEntityAWSResource)
					resourceAttrs.PutStr(entityattributes.AttributeEntityResourceType, entityattributes.AttributeEntityEC2InstanceResource)
					resourceAttrs.PutStr(entityattributes.AttributeEntityIdentifier, ec2Info.GetInstanceID())
				}
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
			}
		case entityattributes.Service:
			if logGroupNamesAttr, ok := resourceAttrs.Get(attributeAwsLogGroupNames); ok {
				logGroupNames = logGroupNamesAttr.Str()
			}
			if serviceNameAttr, ok := resourceAttrs.Get(attributeServiceName); ok {
				serviceName = serviceNameAttr.Str()
			}
			if environmentNameAttr, ok := resourceAttrs.Get(attributeDeploymentEnvironment); ok {
				environmentName = environmentNameAttr.Str()
			}
			if serviceNameSource, sourceExists := resourceAttrs.Get(entityattributes.AttributeEntityServiceNameSource); sourceExists {
				entityServiceNameSource = serviceNameSource.Str()
			}
			// resourcedetection processor may have picked up the ASG name from an ec2:DescribeTags call
			if autoScalingGroupNameAttr, ok := resourceAttrs.Get(attributeEC2TagAwsAutoscalingGroupName); ok {
				setAutoScalingGroup(autoScalingGroupNameAttr.Str())
			}

			entityServiceName := getServiceAttributes(resourceAttrs)
			entityEnvironmentName := environmentName
			if (entityServiceName == EMPTY || entityEnvironmentName == EMPTY) && p.config.ScrapeDatapointAttribute {
				entityServiceName, entityEnvironmentName, entityServiceNameSource = p.scrapeServiceAttribute(rm.At(i).ScopeMetrics())
				// If the entityServiceNameSource is empty here, that means it was not configured via instrumentation
				// If entityServiceName is a datapoint attribute, that means the service name is coming from the UserConfiguration source
				if entityServiceNameSource == entityattributes.AttributeServiceNameSourceUserConfig && entityServiceName != EMPTY {
					entityServiceNameSource = entityattributes.AttributeServiceNameSourceUserConfig
				}
			}
			if p.config.KubernetesMode != "" {
				p.k8sscraper.Scrape(rm.At(i).Resource(), getPodMeta(ctx))
				if p.config.Platform == config.ModeEC2 {
					ec2Info = getEC2InfoFromEntityStore()
				}

				if p.config.KubernetesMode == config.ModeEKS {
					entityPlatformType = entityattributes.AttributeEntityEKSPlatform
				} else {
					entityPlatformType = entityattributes.AttributeEntityK8sPlatform
				}

				podInfo, ok := p.k8sscraper.(*k8sattributescraper.K8sAttributeScraper)
				// Perform fallback mechanism for service name if it is empty
				// or has prefix unknown_service ( unknown_service will be set by OTEL SDK if the service name is empty on application pod)
				// https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
				if (entityServiceName == EMPTY || strings.HasPrefix(entityServiceName, unknownService)) && ok && podInfo != nil && podInfo.Workload != EMPTY {
					entityServiceName = podInfo.Workload
					entityServiceNameSource = entitystore.ServiceNameSourceK8sWorkload
				}
				// Set the service name source to Instrumentation if the operator doesn't set it
				if entityServiceName != EMPTY && entityServiceNameSource == EMPTY && getTelemetrySDKEnabledAttribute(resourceAttrs) {
					entityServiceNameSource = entitystore.ServiceNameSourceInstrumentation
				}
				// Perform fallback mechanism for environment if it is empty
				if entityEnvironmentName == EMPTY && ok && podInfo.Cluster != EMPTY && podInfo.Namespace != EMPTY {
					if p.config.KubernetesMode == config.ModeEKS {
						entityEnvironmentName = "eks:" + p.config.ClusterName + "/" + podInfo.Namespace
					} else if p.config.KubernetesMode == config.ModeK8sEC2 || p.config.KubernetesMode == config.ModeK8sOnPrem {
						entityEnvironmentName = "k8s:" + p.config.ClusterName + "/" + podInfo.Namespace
					}
				}

				// Add service information for a pod to the pod association map
				// so that agent can host this information in a server
				fullPodName := scrapeK8sPodName(resourceAttrs)
				if fullPodName != EMPTY && entityServiceName != EMPTY && entityServiceNameSource != EMPTY {
					addPodToServiceEnvironmentMap(fullPodName, entityServiceName, entityEnvironmentName, entityServiceNameSource)
				} else if fullPodName != EMPTY && entityServiceName != EMPTY && entityServiceNameSource == EMPTY {
					addPodToServiceEnvironmentMap(fullPodName, entityServiceName, entityEnvironmentName, entitystore.ServiceNameSourceUnknown)
				}
				eksAttributes := K8sServiceAttributes{
					Cluster:           podInfo.Cluster,
					Namespace:         podInfo.Namespace,
					Workload:          podInfo.Workload,
					Node:              podInfo.Node,
					InstanceId:        ec2Info.GetInstanceID(),
					ServiceNameSource: entityServiceNameSource,
				}
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)

				if err := validate.Struct(eksAttributes); err == nil {
					resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
					resourceAttrs.PutStr(entityattributes.AttributeEntityCluster, eksAttributes.Cluster)
					resourceAttrs.PutStr(entityattributes.AttributeEntityNamespace, eksAttributes.Namespace)
					resourceAttrs.PutStr(entityattributes.AttributeEntityWorkload, eksAttributes.Workload)
					resourceAttrs.PutStr(entityattributes.AttributeEntityNode, eksAttributes.Node)
					//Add Instance id attribute only if the application node is same as agent node
					if eksAttributes.Node == os.Getenv("K8S_NODE_NAME") {
						AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, eksAttributes.InstanceId)
					}
					AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
					AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, entityServiceNameSource)
				}
				p.k8sscraper.Reset()
			} else if p.config.Platform == config.ModeEC2 {
				//If entityServiceNameSource is empty, it was not configured via the config. Get the source in descending priority
				//  1. Incoming telemetry attributes
				//  2. CWA config
				//  3. instance tags - The tags attached to the EC2 instance. Only scrape for tag with the following key: service, application, app
				//  4. IAM Role - The IAM role name retrieved through IMDS(Instance Metadata Service)
				if entityServiceName == EMPTY && entityServiceNameSource == EMPTY {
					entityServiceName, entityServiceNameSource = getServiceNameSource()
				} else if entityServiceName != EMPTY && entityServiceNameSource == EMPTY {
					entityServiceNameSource = entitystore.ServiceNameSourceUnknown
				}

				entityPlatformType = entityattributes.AttributeEntityEC2Platform
				ec2Info = getEC2InfoFromEntityStore()

				if entityEnvironmentName == EMPTY {
					if getAutoScalingGroupFromEntityStore() != EMPTY {
						entityEnvironmentName = entityattributes.DeploymentEnvironmentFallbackPrefix + getAutoScalingGroupFromEntityStore()
					} else {
						entityEnvironmentName = entityattributes.DeploymentEnvironmentDefault
					}
				}

				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
				AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())

				ec2Attributes := EC2ServiceAttributes{
					InstanceId:        ec2Info.GetInstanceID(),
					AutoScalingGroup:  getAutoScalingGroupFromEntityStore(),
					ServiceNameSource: entityServiceNameSource,
				}
				if err := validate.Struct(ec2Attributes); err == nil {
					resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
					AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, ec2Attributes.InstanceId)
					AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAutoScalingGroup, ec2Attributes.AutoScalingGroup)
					AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, ec2Attributes.ServiceNameSource)
				}
			}
			if logGroupNames == EMPTY || (serviceName == EMPTY && environmentName == EMPTY) {
				continue
			}

			logGroupNamesSlice := strings.Split(logGroupNames, "&")
			for _, logGroupName := range logGroupNamesSlice {
				if logGroupName == EMPTY {
					continue
				}
				addToEntityStore(entitystore.LogGroupName(logGroupName), serviceName, environmentName)
			}
		}

	}
	return md, nil
}