receiver/kafkareceiver/factory.go (86 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( "context" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) const ( defaultTracesTopic = "otlp_spans" defaultMetricsTopic = "otlp_metrics" defaultLogsTopic = "otlp_logs" defaultEncoding = "otlp_proto" ) // NewFactory creates Kafka receiver factory. func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, receiver.WithTraces(createTracesReceiver, metadata.TracesStability), receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), receiver.WithLogs(createLogsReceiver, metadata.LogsStability), ) } func createDefaultConfig() component.Config { return &Config{ ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Encoding: defaultEncoding, MessageMarking: MessageMarking{ After: false, OnError: false, }, HeaderExtraction: HeaderExtraction{ ExtractHeaders: false, }, } } func createTracesReceiver( _ context.Context, set receiver.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { oCfg := *(cfg.(*Config)) if oCfg.Topic == "" { oCfg.Topic = defaultTracesTopic } r, err := newTracesReceiver(oCfg, set, nextConsumer) if err != nil { return nil, err } return r, nil } func createMetricsReceiver( _ context.Context, set receiver.Settings, cfg component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { oCfg := *(cfg.(*Config)) if oCfg.Topic == "" { oCfg.Topic = defaultMetricsTopic } r, err := newMetricsReceiver(oCfg, set, nextConsumer) if err != nil { return nil, err } return r, nil } func createLogsReceiver( _ context.Context, set receiver.Settings, cfg component.Config, nextConsumer consumer.Logs, ) (receiver.Logs, error) { oCfg := *(cfg.(*Config)) if oCfg.Topic == "" { oCfg.Topic = defaultLogsTopic } r, err := newLogsReceiver(oCfg, set, nextConsumer) if err != nil { return nil, err } return r, nil }