processor/elasticinframetricsprocessor/processor.go (105 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elasticinframetricsprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor"
import (
"context"
"iter"
"github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor/internal/metadata"
"github.com/elastic/opentelemetry-lib/remappers/common"
"github.com/elastic/opentelemetry-lib/remappers/hostmetrics"
"github.com/elastic/opentelemetry-lib/remappers/kubernetesmetrics"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
)
const OTelRemappedLabel = common.OTelRemappedLabel
// remapper interface defines the Remap method that should be implemented by different remappers
type remapper interface {
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource)
// Valid returns true if the remapper should be applied to the given scope metrics.
Valid(pmetric.ScopeMetrics) bool
}
type ElasticinframetricsProcessor struct {
cfg *Config
set processor.Settings
logger *zap.Logger
remappers []remapper
}
func newProcessor(set processor.Settings, cfg *Config) *ElasticinframetricsProcessor {
remappers := make([]remapper, 0)
if cfg.AddSystemMetrics {
remappers = append(remappers, hostmetrics.NewRemapper(set.Logger, hostmetrics.WithSystemIntegrationDataset(true)))
}
if cfg.AddK8sMetrics {
remappers = append(remappers, kubernetesmetrics.NewRemapper(set.Logger, kubernetesmetrics.WithKubernetesIntegrationDataset(true)))
}
return &ElasticinframetricsProcessor{
cfg: cfg,
set: set,
logger: set.Logger,
remappers: remappers,
}
}
// processMetrics processes the given metrics and applies remappers if configured.
func (p *ElasticinframetricsProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
for _, resourceMetrics := range md.ResourceMetrics().All() {
for _, scopeMetrics := range resourceMetrics.ScopeMetrics().All() {
for _, r := range p.remappers {
if !r.Valid(scopeMetrics) {
continue
}
p.remapScopeMetrics(r, scopeMetrics, resourceMetrics.Resource())
break // At most one remapper should be applied
}
}
}
return md, nil
}
func (p *ElasticinframetricsProcessor) remapScopeMetrics(
r remapper,
scopeMetrics pmetric.ScopeMetrics,
resource pcommon.Resource,
) {
scope := scopeMetrics.Scope()
if _, ok := scope.Attributes().Get(common.OTelRemappedLabel); ok {
// These metrics have already been remapped.
return
}
scope.Attributes().PutBool(common.OTelRemappedLabel, true)
// Also check if there are any remapped metrics by iterating over the
// metrics, to handle metrics from older versions of the processor that
// do not set scope attributes.
for range remappedMetrics(scopeMetrics.Metrics()) {
// Found remapped metrics.
return
}
result := scopeMetrics.Metrics()
if p.cfg.DropOriginal {
result = pmetric.NewMetricSlice()
}
r.Remap(scopeMetrics, result, resource)
if p.cfg.DropOriginal {
// This overrides the existing metrics with just the remapped ones.
//
// When dropping the original metrics we update the scope name to
// the processor's scope name, since original scope name is no longer
// relevant.
result.CopyTo(scopeMetrics.Metrics())
scope.SetName(metadata.ScopeName)
scope.SetVersion(p.set.BuildInfo.Version)
}
}
func remappedMetrics(ms pmetric.MetricSlice) iter.Seq[pmetric.Metric] {
return func(yield func(pmetric.Metric) bool) {
for _, metric := range ms.All() {
if isRemappedMetric(metric) {
if !yield(metric) {
return
}
}
}
}
}
func isRemappedMetric(metric pmetric.Metric) bool {
switch metric.Type() {
case pmetric.MetricTypeGauge:
for _, dp := range metric.Gauge().DataPoints().All() {
if attr, ok := dp.Attributes().Get(OTelRemappedLabel); ok && attr.Bool() {
return true
}
}
case pmetric.MetricTypeSum:
for _, dp := range metric.Sum().DataPoints().All() {
if attr, ok := dp.Attributes().Get(OTelRemappedLabel); ok && attr.Bool() {
return true
}
}
}
return false
}