processor/resourcedetectionprocessor/factory.go (207 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"
import (
"context"
"strings"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/eks"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/lambda"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure/aks"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/consul"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/docker"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/dynatrace"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/env"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/gcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/heroku"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/k8snode"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/kubeadm"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/openshift"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/system"
)
var consumerCapabilities = consumer.Capabilities{MutatesData: true}
type factory struct {
resourceProviderFactory *internal.ResourceProviderFactory
// providers stores a provider for each named processor that
// may a different set of detectors configured.
providers map[component.ID]*internal.ResourceProvider
lock sync.Mutex
}
// NewFactory creates a new factory for ResourceDetection processor.
func NewFactory() processor.Factory {
resourceProviderFactory := internal.NewProviderFactory(map[internal.DetectorType]internal.DetectorFactory{
aks.TypeStr: aks.NewDetector,
azure.TypeStr: azure.NewDetector,
consul.TypeStr: consul.NewDetector,
docker.TypeStr: docker.NewDetector,
ec2.TypeStr: ec2.NewDetector,
ecs.TypeStr: ecs.NewDetector,
eks.TypeStr: eks.NewDetector,
elasticbeanstalk.TypeStr: elasticbeanstalk.NewDetector,
lambda.TypeStr: lambda.NewDetector,
env.TypeStr: env.NewDetector,
gcp.TypeStr: gcp.NewDetector,
heroku.TypeStr: heroku.NewDetector,
system.TypeStr: system.NewDetector,
openshift.TypeStr: openshift.NewDetector,
k8snode.TypeStr: k8snode.NewDetector,
kubeadm.TypeStr: kubeadm.NewDetector,
dynatrace.TypeStr: dynatrace.NewDetector,
})
f := &factory{
resourceProviderFactory: resourceProviderFactory,
providers: map[component.ID]*internal.ResourceProvider{},
}
return xprocessor.NewFactory(
metadata.Type,
createDefaultConfig,
xprocessor.WithTraces(f.createTracesProcessor, metadata.TracesStability),
xprocessor.WithMetrics(f.createMetricsProcessor, metadata.MetricsStability),
xprocessor.WithLogs(f.createLogsProcessor, metadata.LogsStability),
xprocessor.WithProfiles(f.createProfilesProcessor, metadata.ProfilesStability))
}
// Type gets the type of the Option config created by this factory.
func (*factory) Type() component.Type {
return metadata.Type
}
func createDefaultConfig() component.Config {
return &Config{
Detectors: []string{env.TypeStr},
ClientConfig: defaultClientConfig(),
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
}
func defaultClientConfig() confighttp.ClientConfig {
httpClientSettings := confighttp.NewDefaultClientConfig()
httpClientSettings.Timeout = 5 * time.Second
return httpClientSettings
}
func (f *factory) createTracesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
rdp, err := f.getResourceDetectionProcessor(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewTraces(
ctx,
set,
cfg,
nextConsumer,
rdp.processTraces,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
}
func (f *factory) createMetricsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
rdp, err := f.getResourceDetectionProcessor(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewMetrics(
ctx,
set,
cfg,
nextConsumer,
rdp.processMetrics,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
}
func (f *factory) createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
rdp, err := f.getResourceDetectionProcessor(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewLogs(
ctx,
set,
cfg,
nextConsumer,
rdp.processLogs,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
}
func (f *factory) createProfilesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer xconsumer.Profiles,
) (xprocessor.Profiles, error) {
rdp, err := f.getResourceDetectionProcessor(set, cfg)
if err != nil {
return nil, err
}
return xprocessorhelper.NewProfiles(
ctx,
set,
cfg,
nextConsumer,
rdp.processProfiles,
xprocessorhelper.WithCapabilities(consumerCapabilities),
xprocessorhelper.WithStart(rdp.Start))
}
func (f *factory) getResourceDetectionProcessor(
params processor.Settings,
cfg component.Config,
) (*resourceDetectionProcessor, error) {
oCfg := cfg.(*Config)
if oCfg.Attributes != nil {
params.Logger.Warn("You are using deprecated `attributes` option that will be removed soon; use `resource_attributes` instead, details on configuration: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor#migration-from-attributes-to-resource_attributes")
}
provider, err := f.getResourceProvider(params, oCfg.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes)
if err != nil {
return nil, err
}
return &resourceDetectionProcessor{
provider: provider,
override: oCfg.Override,
httpClientSettings: oCfg.ClientConfig,
telemetrySettings: params.TelemetrySettings,
}, nil
}
func (f *factory) getResourceProvider(
params processor.Settings,
timeout time.Duration,
configuredDetectors []string,
detectorConfigs DetectorConfig,
attributes []string,
) (*internal.ResourceProvider, error) {
f.lock.Lock()
defer f.lock.Unlock()
if provider, ok := f.providers[params.ID]; ok {
return provider, nil
}
detectorTypes := make([]internal.DetectorType, 0, len(configuredDetectors))
for _, key := range configuredDetectors {
detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key)))
}
provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, detectorTypes...)
if err != nil {
return nil, err
}
f.providers[params.ID] = provider
return provider, nil
}