processor/rollupprocessor/processor.go (173 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package rollupprocessor
import (
"context"
"sort"
"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"golang.org/x/exp/maps"
"github.com/aws/amazon-cloudwatch-agent/internal/metric"
"github.com/aws/amazon-cloudwatch-agent/internal/util/collections"
)
type rollupProcessor struct {
attributeGroups [][]string
dropOriginal collections.Set[string]
cache rollupCache
}
func newProcessor(cfg *Config) *rollupProcessor {
cacheSize := cfg.CacheSize
// use no-op cache if no attribute groups
if len(cfg.AttributeGroups) == 0 {
cacheSize = 0
}
return &rollupProcessor{
attributeGroups: uniqueGroups(cfg.AttributeGroups),
dropOriginal: collections.NewSet(cfg.DropOriginal...),
cache: buildRollupCache(cacheSize),
}
}
func (p *rollupProcessor) start(context.Context, component.Host) error {
go p.cache.Start()
return nil
}
func (p *rollupProcessor) stop(context.Context) error {
p.cache.Stop()
return nil
}
func (p *rollupProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
if len(p.attributeGroups) > 0 || len(p.dropOriginal) > 0 {
metric.RangeMetrics(md, p.processMetric)
}
return md, nil
}
func (p *rollupProcessor) processMetric(m pmetric.Metric) {
switch m.Type() {
case pmetric.MetricTypeGauge:
newDataPoints := pmetric.NewNumberDataPointSlice()
rollupDataPoints[pmetric.NumberDataPoint](
p.cache,
p.attributeGroups,
p.dropOriginal,
m.Name(),
m.Gauge().DataPoints(),
newDataPoints,
)
newDataPoints.CopyTo(m.Gauge().DataPoints())
case pmetric.MetricTypeSum:
newDataPoints := pmetric.NewNumberDataPointSlice()
rollupDataPoints[pmetric.NumberDataPoint](
p.cache,
p.attributeGroups,
p.dropOriginal,
m.Name(),
m.Sum().DataPoints(),
newDataPoints,
)
newDataPoints.CopyTo(m.Sum().DataPoints())
case pmetric.MetricTypeHistogram:
newDataPoints := pmetric.NewHistogramDataPointSlice()
rollupDataPoints[pmetric.HistogramDataPoint](
p.cache,
p.attributeGroups,
p.dropOriginal,
m.Name(),
m.Histogram().DataPoints(),
newDataPoints,
)
newDataPoints.CopyTo(m.Histogram().DataPoints())
case pmetric.MetricTypeExponentialHistogram:
newDataPoints := pmetric.NewExponentialHistogramDataPointSlice()
rollupDataPoints[pmetric.ExponentialHistogramDataPoint](
p.cache,
p.attributeGroups,
p.dropOriginal,
m.Name(),
m.ExponentialHistogram().DataPoints(),
newDataPoints,
)
newDataPoints.CopyTo(m.ExponentialHistogram().DataPoints())
case pmetric.MetricTypeSummary:
newDataPoints := pmetric.NewSummaryDataPointSlice()
rollupDataPoints[pmetric.SummaryDataPoint](
p.cache,
p.attributeGroups,
p.dropOriginal,
m.Name(),
m.Summary().DataPoints(),
newDataPoints,
)
newDataPoints.CopyTo(m.Summary().DataPoints())
}
}
// rollupDataPoints makes copies of the original data points for each rollup
// attribute group. If the metric name is in the drop original set, the original
// data points are dropped.
func rollupDataPoints[T metric.DataPoint[T]](
cache rollupCache,
attributeGroups [][]string,
dropOriginal collections.Set[string],
metricName string,
orig metric.DataPoints[T],
dest metric.DataPoints[T],
) {
metric.RangeDataPoints(orig, func(origDataPoint T) {
if !dropOriginal.Contains(metricName) {
origDataPoint.CopyTo(dest.AppendEmpty())
}
if len(attributeGroups) == 0 {
return
}
key := cache.Key(origDataPoint.Attributes())
item := cache.Get(key)
var rollup []pcommon.Map
if item == nil {
rollup = buildRollup(attributeGroups, origDataPoint.Attributes())
cache.Set(key, rollup, ttlcache.DefaultTTL)
} else {
rollup = item.Value()
}
for _, attrs := range rollup {
destDataPoint := dest.AppendEmpty()
origDataPoint.CopyTo(destDataPoint)
attrs.CopyTo(destDataPoint.Attributes())
}
})
}
func buildRollup(attributeGroups [][]string, baseAttributes pcommon.Map) []pcommon.Map {
var results []pcommon.Map
for _, rollupGroup := range attributeGroups {
// skip if target dimensions count is same or more than the original metric.
// cannot have dimensions that do not exist in the original metric.
if len(rollupGroup) >= baseAttributes.Len() {
continue
}
attributes := pcommon.NewMap()
attributes.EnsureCapacity(len(rollupGroup))
for _, key := range rollupGroup {
value, ok := baseAttributes.Get(key)
if !ok {
break
}
value.CopyTo(attributes.PutEmpty(key))
}
if attributes.Len() == len(rollupGroup) {
results = append(results, attributes)
}
}
return results
}
// uniqueGroups filters out duplicate attributes within the sets and filters
// duplicate sets.
func uniqueGroups(groups [][]string) [][]string {
if len(groups) == 0 {
return nil
}
var results [][]string
var uniqueSets []collections.Set[string]
for _, rollupGroup := range groups {
rollupSet := collections.NewSet(rollupGroup...)
isUnique := collections.Range(uniqueSets, func(u collections.Set[string]) bool {
return !rollupSet.Equal(u)
})
if isUnique {
keys := maps.Keys(rollupSet)
sort.Strings(keys)
results = append(results, keys)
uniqueSets = append(uniqueSets, rollupSet)
}
}
return results
}