plugins/processors/gpuattributes/internal/awsneuron_metric_modifier.go (217 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package internal
import (
"strings"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
)
const (
ErrorType = "error_type"
StatusType = "status_type"
EventType = "event_type"
logTypeSuffix = "AWSNeuron"
MemoryLocation = "memory_location"
Core = "Core"
Device = "Device"
Percentile = "percentile"
PodName = "PodName"
Count = "Count"
Bytes = "Bytes"
Seconds = "Seconds"
Percent = "Percent"
NeuronCoreAttributeKey = "NeuronCore"
NeuronDeviceAttributeKey = "NeuronDevice"
RuntimeTag = "runtime_tag"
ClusterName = "ClusterName"
ContainerName = "ContainerName"
FullPodName = "FullPodName"
InstanceId = "InstanceId"
InstanceType = "InstanceType"
K8sPodName = "K8sPodName"
Namespace = "Namespace"
NeuronCore = "NeuronCore"
NeuronDevice = "NeuronDevice"
NodeName = "NodeName"
Service = "Service"
AvailabilityZone = "availability_zone"
Kubernetes = "kubernetes"
Region = "region"
SubnetId = "subnet_id"
RuntimeTagOverride = "DEFAULT"
NeuronExecutionErrorsAggregatedMetric = containerinsightscommon.NeuronExecutionErrors + "_total"
NeuronDeviceHardwareEccEventsAggregatedMetric = containerinsightscommon.NeuronDeviceHardwareEccEvents + "_total"
)
type AwsNeuronMetricModifier struct {
logger *zap.Logger
}
type MetricModifications struct {
DuplicationTypes []string
UniqueAttribute string
LogTypeSuffix string
Unit string
}
type MetricDatapointAggregationKey struct {
runtimeTag string
aggregatedMetricName string
deviceId string
}
var (
metricModificationsMap = map[string]MetricModifications{
containerinsightscommon.NeuronExecutionErrors: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: ErrorType, LogTypeSuffix: "", Unit: Count},
containerinsightscommon.NeuronExecutionStatus: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: StatusType, LogTypeSuffix: "", Unit: Count},
containerinsightscommon.NeuronRuntimeMemoryUsage: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: "", Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationTotal: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationConstants: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationModelCode: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationSharedScratchpad: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationRuntimeMemory: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreMemoryUtilizationTensors: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Bytes},
containerinsightscommon.NeuronCoreUtilization: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: Core, Unit: Percent},
containerinsightscommon.NeuronInstanceInfo: {DuplicationTypes: []string{}, UniqueAttribute: "", LogTypeSuffix: "", Unit: Count},
containerinsightscommon.NeuronHardware: {DuplicationTypes: []string{}, UniqueAttribute: "", LogTypeSuffix: "", Unit: Count},
containerinsightscommon.NeuronExecutionLatency: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: "", LogTypeSuffix: "", Unit: Seconds},
containerinsightscommon.NeuronDeviceHardwareEccEvents: {DuplicationTypes: []string{containerinsightscommon.TypeContainer, containerinsightscommon.TypePod, containerinsightscommon.TypeNode}, UniqueAttribute: EventType, LogTypeSuffix: Device, Unit: Count},
}
attributeValuePrefixingMap = map[string]string{NeuronCoreAttributeKey: "core", NeuronDeviceAttributeKey: "device"}
uniquesDatapointsToAggregatedMetricMappings = map[string]map[string]string{
containerinsightscommon.NeuronExecutionErrors: {"generic": NeuronExecutionErrorsAggregatedMetric,
"numerical": NeuronExecutionErrorsAggregatedMetric,
"transient": NeuronExecutionErrorsAggregatedMetric,
"model": NeuronExecutionErrorsAggregatedMetric,
"runtime": NeuronExecutionErrorsAggregatedMetric,
"hardware": NeuronExecutionErrorsAggregatedMetric},
// execution_status metric will be added here incrementally
containerinsightscommon.NeuronDeviceHardwareEccEvents: {"mem_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"mem_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric},
}
)
func NewMetricModifier(logger *zap.Logger) *AwsNeuronMetricModifier {
d := &AwsNeuronMetricModifier{
logger: logger,
}
return d
}
func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, metrics pmetric.MetricSlice) {
// only decorate Aws Neuron metrics
// another option is to separate Aws Neuron in its own pipeline to minimize extra processing of metrics
if _, isNeuronMetric := metricModificationsMap[originalMetric.Name()]; !isNeuronMetric {
return
}
// Since the otel to grouped metrics conversions takes type into account,
// thus we need to convert all metrics to the same type so that they are grouped together.
if originalMetric.Type() == pmetric.MetricTypeGauge {
convertGaugeToSum(originalMetric)
}
// Neuron metrics sent by the neuron monitor don't have any units so we add them in the agent.
addUnit(originalMetric)
updateCoreDeviceRuntimeLabels(originalMetric)
resetStaleDatapoints(originalMetric)
originalMetricName := originalMetric.Name()
// The neuron metrics sent by the neuron monitor are not homogeneous
// and some metrics require special processing.
// We perform those special processing before duplicating metric for pod, node and container.
if originalMetricName == containerinsightscommon.NeuronExecutionLatency {
keepSpecificDatapointBasedOnAttribute(originalMetric, Percentile, "p50")
} else if originalMetricName == containerinsightscommon.NeuronRuntimeMemoryUsage {
keepSpecificDatapointBasedOnAttribute(originalMetric, MemoryLocation, "neuron_device")
}
modifiedMetricSlice := md.extractDatapointsAsMetricsAndAggregate(originalMetric)
md.duplicateMetrics(modifiedMetricSlice, originalMetricName, originalMetric.Sum().DataPoints(), metrics)
}
// This method converts gauges to sum so that all metrics can be grouped in the same grouped metrics.
// The default value of temporality is undefined so even after conversion from gauge to sum the agent won't take delta.
func convertGaugeToSum(originalMetric pmetric.Metric) {
datapoints := originalMetric.Gauge().DataPoints()
originalMetric.SetEmptySum()
datapoints.MoveAndAppendTo(originalMetric.Sum().DataPoints())
}
func addUnit(originalMetric pmetric.Metric) {
originalMetric.SetUnit(metricModificationsMap[originalMetric.Name()].Unit)
}
// This method keeps a specific datapoint in the list of datapoints,
// filtering out the rest based on value of the target attribute.
// - For neuron_execution_latency metric we keep p50 percentile
// - For neurondevice_runtime_memory we keep the neuron_device memory datapoint
// example :
//
// in : neurondevice_runtime_memory {datapoints: [ 0 : {Attributes : {..., percentile:p50, ....}, value 3}, 1: {Attributes : {..., percentile:p99, ....}, , value 4}]}
// out : neurondevice_runtime_memory {datapoints: [ 0 : {Attributes : {..., percentile:p50, ....}, value 3}]}
func keepSpecificDatapointBasedOnAttribute(originalMetric pmetric.Metric, attributeKey string, attributeValueToKeep string) {
originalMetric.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
value, exists := dp.Attributes().Get(attributeKey)
return !exists || value.Str() != attributeValueToKeep
})
}
// This method takes a metric and creates an aggregated metric from its datapoint values.
// It also creates a new metric for each datapoint based on the unique target attribute.
// example :
// in: unique_target_attribute = error_type
// and error_type: A,B,C need to be aggregated in neuron_execution_errors_total metric then
//
// neuron_execution_errors {
// datapoints : [
// 0 : { Attribute : {..., error_type : A, ....}, value = 1 },
// 1 : { Attribute : {..., error_type : B, ....}, value = 2 },
// 2 : { Attribute : {..., error_type : C, ....}, value = 3 }
// ]
// }
//
// out: unique_target_attribute = error_type
// [
//
// neuron_execution_errors_total {
// datapoints : [ 0 : { Attribute : {..., error_type : A, ....}, value = 6 }]
// },
// neuron_execution_errors_A {
// datapoints : [ 0 : { Attribute : {..., error_type : A, ....}, value = 1 }]
// },
// neuron_execution_errors_B {
// datapoints : [ 0 : { Attribute : {..., error_type : B, ....}, value = 2 }]
// },
// neuron_execution_errors_C {
// datapoints : [ 0 : { Attribute : {..., error_type : C, ....}, value = 3 }]
// },
//
// ]
func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(originalMetric pmetric.Metric) pmetric.MetricSlice {
newMetricSlice := pmetric.NewMetricSlice()
uniqueAttribute := metricModificationsMap[originalMetric.Name()].UniqueAttribute
if uniqueAttribute == "" {
originalMetric.CopyTo(newMetricSlice.AppendEmpty())
return newMetricSlice
}
originalMetricDatapoints := originalMetric.Sum().DataPoints()
aggregatedValuesPerRuntimeTag := map[MetricDatapointAggregationKey]float64{}
uniqueAttributeToAggregatedMetricMappings, needsAggregation := uniquesDatapointsToAggregatedMetricMappings[originalMetric.Name()]
for i := 0; i < originalMetricDatapoints.Len(); i++ {
originalDatapoint := originalMetricDatapoints.At(i)
runtimeTag, _ := originalDatapoint.Attributes().Get(RuntimeTag)
deviceId, _ := originalDatapoint.Attributes().Get(NeuronDeviceAttributeKey)
uniqueAttributeValue, _ := originalDatapoint.Attributes().Get(uniqueAttribute)
// only add to the aggregation map if the datapoint to aggregated metric mappings are defined for the original metric
if needsAggregation {
aggregatedMetricName := uniqueAttributeToAggregatedMetricMappings[uniqueAttributeValue.Str()]
aggregatedValuesPerRuntimeTag[MetricDatapointAggregationKey{runtimeTag: runtimeTag.Str(), aggregatedMetricName: aggregatedMetricName, deviceId: deviceId.Str()}] += originalDatapoint.DoubleValue()
}
// Creating a new metric from the current datapoint and adding it to the new newMetricSlice
newNameMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), originalMetric.Name()+"_"+uniqueAttributeValue.Str(), originalMetric.Unit())
originalDatapoint.CopyTo(newNameMetric.SetEmptySum().DataPoints().AppendEmpty())
// setting value of temporality to cumulative so that agent performs delta conversion on this metric
newNameMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for aggregatedMetricMetadata, value := range aggregatedValuesPerRuntimeTag {
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), aggregatedMetricMetadata.aggregatedMetricName, originalMetric.Unit())
originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
aggregatedMetric.Sum().DataPoints().At(0).SetDoubleValue(value)
aggregatedMetric.Sum().DataPoints().At(0).Attributes().PutStr(RuntimeTag, aggregatedMetricMetadata.runtimeTag)
if aggregatedMetricMetadata.deviceId != "" {
aggregatedMetric.Sum().DataPoints().At(0).Attributes().PutStr(NeuronDeviceAttributeKey, aggregatedMetricMetadata.deviceId)
}
// setting value of temporality to cumulative so that agent performs delta conversion on this metric
aggregatedMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
return newMetricSlice
}
// This method prefixes NeuronCore and NeuronDevice values with `core` and `device` respectively
// to make the attribute values more verbose
func updateCoreDeviceRuntimeLabels(originalMetric pmetric.Metric) {
dps := originalMetric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
for attributeKey, attributeValuePrefix := range attributeValuePrefixingMap {
if value, exists := dp.Attributes().Get(attributeKey); exists {
dp.Attributes().PutStr(attributeKey, attributeValuePrefix+value.Str())
}
}
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
}
}
// This method performs selective duplication of a metric based on the types for which duplication needs to be performed.
// A metric is duplicated for pod and container only if pod correlation has been done successfully
func (md *AwsNeuronMetricModifier) duplicateMetrics(metricsSlice pmetric.MetricSlice, originalMetricName string, originalMetricDatapoints pmetric.NumberDataPointSlice, metrics pmetric.MetricSlice) {
metricModifications := metricModificationsMap[originalMetricName]
// check if pod correlation has been performed, if not then don't emit metric for container and pod
duplicateForNodeOnly := false
podName, exists := originalMetricDatapoints.At(0).Attributes().Get(PodName)
if !exists || len(podName.Str()) == 0 {
duplicateForNodeOnly = true
}
for i := 0; i < metricsSlice.Len(); i++ {
metric := metricsSlice.At(i)
if duplicateForNodeOnly {
duplicateMetricForType(metric, containerinsightscommon.TypeNode, originalMetricName, metrics)
} else {
for _, prefix := range metricModifications.DuplicationTypes {
duplicateMetricForType(metric, prefix, originalMetricName, metrics)
}
}
}
}
// This method creates new metrics by prefixing the metric name with each k8 concepts (pod, node and container).
// It also adds logTypes to all the metric datapoint attributes.
func duplicateMetricForType(metric pmetric.Metric, duplicateType string, originalMetricName string, metrics pmetric.MetricSlice) {
metricCopy := metrics.AppendEmpty()
metric.CopyTo(metricCopy)
metricCopy.SetName(strings.ToLower(duplicateType) + "_" + metricCopy.Name())
datapoints := metricCopy.Sum().DataPoints()
for i := 0; i < datapoints.Len(); i++ {
datapoints.At(i).Attributes().PutStr(containerinsightscommon.MetricType, duplicateType+logTypeSuffix+metricModificationsMap[originalMetricName].LogTypeSuffix)
}
}
func setMetricMetadata(metric pmetric.Metric, name string, unit string) pmetric.Metric {
metric.SetName(name)
metric.SetUnit(unit)
return metric
}
// This method updates the stale or nan datapoints so that they report the default value of 0 instead. This is needed so that we can see the default values instead of a gap.
// - return the assigned value converted to a double if possible, else 0
// - set the runtime tag to default since the runtime associated no longer exists
// - reset the NoRecordedValue flag so that the metric is not dropped
func resetStaleDatapoints(originalMetric pmetric.Metric) {
dps := originalMetric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
if dp.ValueType() == pmetric.NumberDataPointValueTypeEmpty || dp.Flags().NoRecordedValue() {
dp.SetDoubleValue(dp.DoubleValue())
dp.Attributes().PutStr(RuntimeTag, RuntimeTagOverride)
dp.SetFlags(dp.Flags().WithNoRecordedValue(false))
}
}
}