exporter/kafkaexporter/factory.go (113 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)
const (
defaultLogsTopic = "otlp_logs"
defaultLogsEncoding = "otlp_proto"
defaultMetricsTopic = "otlp_metrics"
defaultMetricsEncoding = "otlp_proto"
defaultTracesTopic = "otlp_spans"
defaultTracesEncoding = "otlp_proto"
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
// partitioning logs by resource attributes is disabled by default
defaultPartitionLogsByResourceAttributesEnabled = false
)
// NewFactory creates Kafka exporter factory.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
)
}
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
ClientConfig: configkafka.NewDefaultClientConfig(),
Producer: configkafka.NewDefaultProducerConfig(),
Logs: SignalConfig{
Topic: defaultLogsTopic,
Encoding: defaultLogsEncoding,
},
Metrics: SignalConfig{
Topic: defaultMetricsTopic,
Encoding: defaultMetricsEncoding,
},
Traces: SignalConfig{
Topic: defaultTracesTopic,
Encoding: defaultTracesEncoding,
},
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
}
}
func createTracesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
oCfg := *(cfg.(*Config)) // Clone the config
exp := newTracesExporter(oCfg, set)
return exporterhelper.NewTraces(
ctx,
set,
&oCfg,
exp.exportData,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Close),
)
}
func createMetricsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
oCfg := *(cfg.(*Config)) // Clone the config
exp := newMetricsExporter(oCfg, set)
return exporterhelper.NewMetrics(
ctx,
set,
&oCfg,
exp.exportData,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Close),
)
}
func createLogsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
oCfg := *(cfg.(*Config)) // Clone the config
exp := newLogsExporter(oCfg, set)
return exporterhelper.NewLogs(
ctx,
set,
&oCfg,
exp.exportData,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Close),
)
}