processor/attributesprocessor/factory.go (106 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package attributesprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" import ( "context" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterlog" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filtermetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterspan" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor/internal/metadata" ) var processorCapabilities = consumer.Capabilities{MutatesData: true} // NewFactory returns a new factory for the Attributes processor. func NewFactory() processor.Factory { return processor.NewFactory( metadata.Type, createDefaultConfig, processor.WithTraces(createTracesProcessor, metadata.TracesStability), processor.WithLogs(createLogsProcessor, metadata.LogsStability), processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability)) } // Note: This isn't a valid configuration because the processor would do no work. func createDefaultConfig() component.Config { return &Config{} } func createTracesProcessor( ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { oCfg := cfg.(*Config) attrProc, err := attraction.NewAttrProc(&oCfg.Settings) if err != nil { return nil, err } skipExpr, err := filterspan.NewSkipExpr(&oCfg.MatchConfig) if err != nil { return nil, err } return processorhelper.NewTraces( ctx, set, cfg, nextConsumer, newSpanAttributesProcessor(set.Logger, attrProc, skipExpr).processTraces, processorhelper.WithCapabilities(processorCapabilities)) } func createLogsProcessor( ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs, ) (processor.Logs, error) { oCfg := cfg.(*Config) attrProc, err := attraction.NewAttrProc(&oCfg.Settings) if err != nil { return nil, err } skipExpr, err := filterlog.NewSkipExpr(&oCfg.MatchConfig) if err != nil { return nil, err } return processorhelper.NewLogs( ctx, set, cfg, nextConsumer, newLogAttributesProcessor(set.Logger, attrProc, skipExpr).processLogs, processorhelper.WithCapabilities(processorCapabilities)) } func createMetricsProcessor( ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { oCfg := cfg.(*Config) attrProc, err := attraction.NewAttrProc(&oCfg.Settings) if err != nil { return nil, err } includeMatchProperties, err := filterconfig.CreateMetricMatchPropertiesFromDefault(oCfg.Include) if err != nil { return nil, err } excludeMatchProperties, err := filterconfig.CreateMetricMatchPropertiesFromDefault(oCfg.Exclude) if err != nil { return nil, err } skipExpr, err := filtermetric.NewSkipExpr( includeMatchProperties, excludeMatchProperties, ) if err != nil { return nil, err } return processorhelper.NewMetrics( ctx, set, cfg, nextConsumer, newMetricAttributesProcessor(set.Logger, attrProc, skipExpr).processMetrics, processorhelper.WithCapabilities(processorCapabilities)) }