exporter/awsemfexporter/metric_translator.go (375 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"time"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)
const (
// OTel instrumentation lib name as dimension
oTellibDimensionKey = "OTelLib"
defaultNamespace = "default"
// DimensionRollupOptions
zeroAndSingleDimensionRollup = "ZeroAndSingleDimensionRollup"
singleDimensionRollupOnly = "SingleDimensionRollupOnly"
prometheusReceiver = "prometheus"
attributeReceiver = "receiver"
fieldPrometheusMetricType = "prom_metric_type"
// metric attributes for AWS EMF, not to be treated as metric labels
emfStorageResolutionAttribute = "aws.emf.storage_resolution"
)
var fieldPrometheusTypes = map[pmetric.MetricType]string{
pmetric.MetricTypeEmpty: "",
pmetric.MetricTypeGauge: "gauge",
pmetric.MetricTypeSum: "counter",
pmetric.MetricTypeHistogram: "histogram",
pmetric.MetricTypeSummary: "summary",
}
type cWMetrics struct {
measurements []cWMeasurement
timestampMs int64
fields map[string]any
}
type cWMetricInfo struct {
Name string
Unit string
StorageResolution int
}
type cWMeasurement struct {
Namespace string
Dimensions [][]string
Metrics []cWMetricInfo
}
type cWMetricStats struct {
Max float64
Min float64
Count uint64
Sum float64
}
// The SampleCount of CloudWatch metrics will be calculated by the sum of the 'Counts' array.
// The 'Count' field should be same as the sum of the 'Counts' array and will be ignored in CloudWatch.
type cWMetricHistogram struct {
Values []float64
Counts []float64
Max float64
Min float64
Count uint64
Sum float64
}
type groupedMetricMetadata struct {
namespace string
timestampMs int64
logGroup string
logStream string
metricDataType pmetric.MetricType
batchIndex int
retainInitialValueForDelta bool
}
// cWMetricMetadata represents the metadata associated with a given CloudWatch metric
type cWMetricMetadata struct {
groupedMetricMetadata
instrumentationScopeName string
receiver string
}
type metricTranslator struct {
metricDescriptor map[string]MetricDescriptor
calculators *emfCalculators
}
func newMetricTranslator(config Config) metricTranslator {
mt := map[string]MetricDescriptor{}
for _, descriptor := range config.MetricDescriptors {
mt[descriptor.MetricName] = descriptor
}
return metricTranslator{
metricDescriptor: mt,
calculators: &emfCalculators{
delta: aws.NewFloat64DeltaCalculator(),
summary: aws.NewMetricCalculator(calculateSummaryDelta),
},
}
}
func (mt metricTranslator) Shutdown() error {
var errs error
errs = multierr.Append(errs, mt.calculators.delta.Shutdown())
errs = multierr.Append(errs, mt.calculators.summary.Shutdown())
return errs
}
// translateOTelToGroupedMetric converts OT metrics to Grouped Metric format.
func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetrics, groupedMetrics map[any]*groupedMetric, config *Config) error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
var instrumentationScopeName string
cWNamespace := getNamespace(rm, config.Namespace)
logGroup, logStream, patternReplaceSucceeded := getLogInfo(rm, cWNamespace, config)
deltaInitialValue := config.RetainInitialValueOfDeltaMetric
ilms := rm.ScopeMetrics()
var metricReceiver string
if receiver, ok := rm.Resource().Attributes().Get(attributeReceiver); ok {
metricReceiver = receiver.Str()
}
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.Scope().Name() != "" {
instrumentationScopeName = ilm.Scope().Name()
}
metrics := ilm.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
metadata := cWMetricMetadata{
groupedMetricMetadata: groupedMetricMetadata{
namespace: cWNamespace,
timestampMs: timestamp,
logGroup: logGroup,
logStream: logStream,
metricDataType: metric.Type(),
batchIndex: 0,
retainInitialValueForDelta: deltaInitialValue,
},
instrumentationScopeName: instrumentationScopeName,
receiver: metricReceiver,
}
err := addToGroupedMetric(metric, groupedMetrics, metadata, patternReplaceSucceeded, mt.metricDescriptor, config, mt.calculators)
if err != nil {
return err
}
}
}
return nil
}
// translateGroupedMetricToCWMetric converts Grouped Metric format to CloudWatch Metric format.
func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Config) *cWMetrics {
labels := filterAWSEMFAttributes(groupedMetric.labels)
fieldsLength := len(labels) + len(groupedMetric.metrics)
isPrometheusMetric := groupedMetric.metadata.receiver == prometheusReceiver
if isPrometheusMetric {
fieldsLength++
}
fields := make(map[string]any, fieldsLength)
// Add labels to fields
for k, v := range labels {
fields[k] = v
}
// Add metrics to fields
for metricName, metricInfo := range groupedMetric.metrics {
fields[metricName] = metricInfo.value
}
if isPrometheusMetric {
fields[fieldPrometheusMetricType] = fieldPrometheusTypes[groupedMetric.metadata.metricDataType]
}
var cWMeasurements []cWMeasurement
if len(config.MetricDeclarations) == 0 {
// If there are no metric declarations defined, translate grouped metric
// into the corresponding CW Measurement
cwm := groupedMetricToCWMeasurement(groupedMetric, config)
cWMeasurements = []cWMeasurement{cwm}
} else {
// If metric declarations are defined, filter grouped metric's metrics using
// metric declarations and translate into the corresponding list of CW Measurements
cWMeasurements = groupedMetricToCWMeasurementsWithFilters(groupedMetric, config)
}
return &cWMetrics{
measurements: cWMeasurements,
timestampMs: groupedMetric.metadata.timestampMs,
fields: fields,
}
}
// groupedMetricToCWMeasurement creates a single CW Measurement from a grouped metric.
func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config) cWMeasurement {
labels := filterAWSEMFAttributes(groupedMetric.labels)
dimensionRollupOption := config.DimensionRollupOption
// Create a dimension set containing list of label names
dimSet := make([]string, len(labels))
idx := 0
for labelName := range labels {
dimSet[idx] = labelName
idx++
}
dimensions := [][]string{dimSet}
// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(dimensionRollupOption, labels)
if len(rollupDimensionArray) > 0 {
// Perform duplication check for edge case with a single label and single dimension roll-up
_, hasOTelLibKey := labels[oTellibDimensionKey]
isSingleLabel := len(dimSet) <= 1 || (len(dimSet) == 2 && hasOTelLibKey)
singleDimRollup := dimensionRollupOption == singleDimensionRollupOnly ||
dimensionRollupOption == zeroAndSingleDimensionRollup
if isSingleLabel && singleDimRollup {
// Remove duplicated dimension set before adding on rolled-up dimensions
dimensions = nil
}
}
// Add on rolled-up dimensions
dimensions = append(dimensions, rollupDimensionArray...)
metrics := make([]cWMetricInfo, len(groupedMetric.metrics))
idx = 0
for metricName, metricInfo := range groupedMetric.metrics {
metrics[idx] = cWMetricInfo{
Name: metricName,
StorageResolution: 60,
}
if metricInfo.unit != "" {
metrics[idx].Unit = metricInfo.unit
}
if storRes, ok := groupedMetric.labels[emfStorageResolutionAttribute]; ok {
if storResInt, err := strconv.Atoi(storRes); err == nil {
metrics[idx].StorageResolution = storResInt
}
}
idx++
}
return cWMeasurement{
Namespace: groupedMetric.metadata.namespace,
Dimensions: dimensions,
Metrics: metrics,
}
}
// groupedMetricToCWMeasurementsWithFilters filters the grouped metric using the given list of metric
// declarations and returns the corresponding list of CW Measurements.
func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, config *Config) (cWMeasurements []cWMeasurement) {
labels := filterAWSEMFAttributes(groupedMetric.labels)
// Filter metric declarations by labels
metricDeclarations := make([]*MetricDeclaration, 0, len(config.MetricDeclarations))
for _, metricDeclaration := range config.MetricDeclarations {
if metricDeclaration.MatchesLabels(labels) {
metricDeclarations = append(metricDeclarations, metricDeclaration)
}
}
// If the whole batch of metrics don't match any metric declarations, drop them
if len(metricDeclarations) == 0 {
labelsStr, _ := json.Marshal(labels)
var metricNames []string
for metricName := range groupedMetric.metrics {
metricNames = append(metricNames, metricName)
}
config.logger.Debug(
"Dropped batch of metrics: no metric declaration matched labels",
zap.String("Labels", string(labelsStr)),
zap.Strings("Metric Names", metricNames),
)
return
}
// Group metrics by matched metric declarations
type metricDeclarationGroup struct {
metricDeclIdxList []int
metrics []cWMetricInfo
}
metricDeclGroups := make(map[string]*metricDeclarationGroup)
for metricName, metricInfo := range groupedMetric.metrics {
// Filter metric declarations by metric name
var metricDeclIdx []int
for i, metricDeclaration := range metricDeclarations {
if metricDeclaration.MatchesName(metricName) {
metricDeclIdx = append(metricDeclIdx, i)
}
}
if len(metricDeclIdx) == 0 {
config.logger.Debug(
"Dropped metric: no metric declaration matched metric name",
zap.String("Metric name", metricName),
)
continue
}
metric := cWMetricInfo{
Name: metricName,
StorageResolution: 60,
}
if metricInfo.unit != "" {
metric.Unit = metricInfo.unit
}
if storRes, ok := groupedMetric.labels[emfStorageResolutionAttribute]; ok {
if storResInt, err := strconv.Atoi(storRes); err == nil {
metric.StorageResolution = storResInt
}
}
metricDeclKey := fmt.Sprint(metricDeclIdx)
if group, ok := metricDeclGroups[metricDeclKey]; ok {
group.metrics = append(group.metrics, metric)
} else {
metricDeclGroups[metricDeclKey] = &metricDeclarationGroup{
metricDeclIdxList: metricDeclIdx,
metrics: []cWMetricInfo{metric},
}
}
}
if len(metricDeclGroups) == 0 {
return
}
// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(config.DimensionRollupOption, labels)
// Translate each group into a CW Measurement
cWMeasurements = make([]cWMeasurement, 0, len(metricDeclGroups))
for _, group := range metricDeclGroups {
var dimensions [][]string
// Extract dimensions from matched metric declarations
for _, metricDeclIdx := range group.metricDeclIdxList {
dims := metricDeclarations[metricDeclIdx].ExtractDimensions(labels)
dimensions = append(dimensions, dims...)
}
dimensions = append(dimensions, rollupDimensionArray...)
// De-duplicate dimensions
dimensions = dedupDimensions(dimensions)
// Export metrics only with non-empty dimensions list
if len(dimensions) > 0 {
cwm := cWMeasurement{
Namespace: groupedMetric.metadata.namespace,
Dimensions: dimensions,
Metrics: group.metrics,
}
cWMeasurements = append(cWMeasurements, cwm)
}
}
return
}
// translateCWMetricToEMF converts CloudWatch Metric format to EMF.
func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event, error) {
// convert CWMetric into map format for compatible with PLE input
fieldMap := cWMetric.fields
// restore the json objects that are stored as string in attributes
for _, key := range config.ParseJSONEncodedAttributeValues {
if fieldMap[key] == nil {
continue
}
if val, ok := fieldMap[key].(string); ok {
var f any
err := json.Unmarshal([]byte(val), &f)
if err != nil {
config.logger.Debug(
"Failed to parse json-encoded string",
zap.String("label key", key),
zap.String("label value", val),
zap.Error(err),
)
continue
}
fieldMap[key] = f
} else {
config.logger.Debug(
"Invalid json-encoded data. A string is expected",
zap.Any("type", reflect.TypeOf(fieldMap[key])),
zap.Any("value", reflect.ValueOf(fieldMap[key])),
)
}
}
// Create EMF metrics if there are measurements
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure
if len(cWMetric.measurements) > 0 {
if config.Version == "1" {
/* EMF V1
"Version": "1",
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "ECS",
"Dimensions": [ ["ClusterName"] ],
"Metrics": [{"Name": "memcached_commands_total"}]
}
],
"Timestamp": 1668387032641
}
*/
fieldMap["Version"] = "1"
fieldMap["_aws"] = map[string]any{
"CloudWatchMetrics": cWMetric.measurements,
"Timestamp": cWMetric.timestampMs,
}
}
}
if config.Version == "0" {
fieldMap["Timestamp"] = fmt.Sprint(cWMetric.timestampMs)
if len(cWMetric.measurements) > 0 {
/* EMF V0
{
"Version": "0",
"CloudWatchMetrics": [
{
"Namespace": "ECS",
"Dimensions": [ ["ClusterName"] ],
"Metrics": [{"Name": "memcached_commands_total"}]
}
],
"Timestamp": "1668387032641"
}
*/
fieldMap["Version"] = "0"
fieldMap["CloudWatchMetrics"] = cWMetric.measurements
}
}
pleMsg, err := json.Marshal(fieldMap)
if err != nil {
return nil, err
}
metricCreationTime := cWMetric.timestampMs
logEvent := cwlogs.NewEvent(
metricCreationTime,
string(pleMsg),
)
logEvent.GeneratedTime = time.Unix(0, metricCreationTime*int64(time.Millisecond))
return logEvent, nil
}
// Utility function that converts from groupedMetric to a cloudwatch event
func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, defaultLogStream string) (*cwlogs.Event, error) {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, config)
event, err := translateCWMetricToEMF(cWMetric, config)
if err != nil {
return nil, err
}
logGroup := groupedMetric.metadata.logGroup
logStream := groupedMetric.metadata.logStream
if logStream == "" {
logStream = defaultLogStream
}
event.LogGroupName = logGroup
event.LogStreamName = logStream
return event, nil
}
func filterAWSEMFAttributes(labels map[string]string) map[string]string {
// remove any labels that are attributes specific to AWS EMF Exporter
filteredLabels := make(map[string]string)
for labelName := range labels {
if labelName != emfStorageResolutionAttribute {
filteredLabels[labelName] = labels[labelName]
}
}
return filteredLabels
}