exporter/kafkaexporter/kafka_exporter.go (284 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
import (
"context"
"errors"
"fmt"
"iter"
"github.com/IBM/sarama"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)
type kafkaErrors struct {
count int
err string
}
func (ke kafkaErrors) Error() string {
return fmt.Sprintf("Failed to deliver %d messages due to %s", ke.count, ke.err)
}
type kafkaMessager[T any] interface {
// partitionData returns an iterator that yields key-value pairs
// where the key is the partition key, and the value is the pdata
// type (plog.Logs, etc.)
partitionData(T) iter.Seq2[[]byte, T]
// marshalData marshals a pdata type into onr or more messages.
marshalData(T) ([]marshaler.Message, error)
// getTopic returns the topic name for the given context and data.
getTopic(context.Context, T) string
}
type kafkaExporter[T any] struct {
cfg Config
logger *zap.Logger
newMessager func(host component.Host) (kafkaMessager[T], error)
messager kafkaMessager[T]
producer sarama.SyncProducer
}
func newKafkaExporter[T any](
config Config,
set exporter.Settings,
newMessager func(component.Host) (kafkaMessager[T], error),
) *kafkaExporter[T] {
return &kafkaExporter[T]{
cfg: config,
logger: set.Logger,
newMessager: newMessager,
}
}
func (e *kafkaExporter[T]) Start(ctx context.Context, host component.Host) error {
messager, err := e.newMessager(host)
if err != nil {
return err
}
e.messager = messager
producer, err := kafka.NewSaramaSyncProducer(
ctx, e.cfg.ClientConfig, e.cfg.Producer,
e.cfg.TimeoutSettings.Timeout,
)
if err != nil {
return err
}
e.producer = producer
return nil
}
func (e *kafkaExporter[T]) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}
func (e *kafkaExporter[T]) exportData(ctx context.Context, data T) error {
var allSaramaMessages []*sarama.ProducerMessage
for key, data := range e.messager.partitionData(data) {
partitionMessages, err := e.messager.marshalData(data)
if err != nil {
return consumererror.NewPermanent(err)
}
for i := range partitionMessages {
// Marshalers may set the Key, so don't override
// if it's set and we're not partitioning here.
if key != nil {
partitionMessages[i].Key = key
}
}
saramaMessages := makeSaramaMessages(partitionMessages, e.messager.getTopic(ctx, data))
allSaramaMessages = append(allSaramaMessages, saramaMessages...)
}
messagesWithHeaders(allSaramaMessages, metadataToHeaders(
ctx, e.cfg.IncludeMetadataKeys,
))
if err := e.producer.SendMessages(allSaramaMessages); err != nil {
var prodErr sarama.ProducerErrors
if errors.As(err, &prodErr) {
if len(prodErr) > 0 {
return kafkaErrors{len(prodErr), prodErr[0].Err.Error()}
}
}
return err
}
return nil
}
func newTracesExporter(config Config, set exporter.Settings) *kafkaExporter[ptrace.Traces] {
// Jaeger encodings do their own partitioning, so disable trace ID
// partitioning when they are configured.
switch config.Traces.Encoding {
case "jaeger_proto", "jaeger_json":
config.PartitionTracesByID = false
}
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[ptrace.Traces], error) {
marshaler, err := getTracesMarshaler(config.Traces.Encoding, host)
if err != nil {
return nil, err
}
return &kafkaTracesMessager{
config: config,
marshaler: marshaler,
}, nil
})
}
type kafkaTracesMessager struct {
config Config
marshaler marshaler.TracesMarshaler
}
func (e *kafkaTracesMessager) marshalData(td ptrace.Traces) ([]marshaler.Message, error) {
return e.marshaler.MarshalTraces(td)
}
func (e *kafkaTracesMessager) getTopic(ctx context.Context, td ptrace.Traces) string {
return getTopic(ctx, &e.config, td.ResourceSpans())
}
func (e *kafkaTracesMessager) partitionData(td ptrace.Traces) iter.Seq2[[]byte, ptrace.Traces] {
return func(yield func([]byte, ptrace.Traces) bool) {
if !e.config.PartitionTracesByID {
yield(nil, td)
return
}
for _, td := range batchpersignal.SplitTraces(td) {
// Note that batchpersignal.SplitTraces guarantees that each trace
// has exactly one trace, and by implication, at least one span.
key := []byte(traceutil.TraceIDToHexOrEmptyString(
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID(),
))
if !yield(key, td) {
return
}
}
}
}
func newLogsExporter(config Config, set exporter.Settings) *kafkaExporter[plog.Logs] {
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[plog.Logs], error) {
marshaler, err := getLogsMarshaler(config.Logs.Encoding, host)
if err != nil {
return nil, err
}
return &kafkaLogsMessager{
config: config,
marshaler: marshaler,
}, nil
})
}
type kafkaLogsMessager struct {
config Config
marshaler marshaler.LogsMarshaler
}
func (e *kafkaLogsMessager) marshalData(ld plog.Logs) ([]marshaler.Message, error) {
return e.marshaler.MarshalLogs(ld)
}
func (e *kafkaLogsMessager) getTopic(ctx context.Context, ld plog.Logs) string {
return getTopic(ctx, &e.config, ld.ResourceLogs())
}
func (e *kafkaLogsMessager) partitionData(ld plog.Logs) iter.Seq2[[]byte, plog.Logs] {
return func(yield func([]byte, plog.Logs) bool) {
if !e.config.PartitionLogsByResourceAttributes {
yield(nil, ld)
return
}
for _, resourceLogs := range ld.ResourceLogs().All() {
hash := pdatautil.MapHash(resourceLogs.Resource().Attributes())
newLogs := plog.NewLogs()
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
if !yield(hash[:], newLogs) {
return
}
}
}
}
func newMetricsExporter(config Config, set exporter.Settings) *kafkaExporter[pmetric.Metrics] {
return newKafkaExporter(config, set, func(host component.Host) (kafkaMessager[pmetric.Metrics], error) {
marshaler, err := getMetricsMarshaler(config.Metrics.Encoding, host)
if err != nil {
return nil, err
}
return &kafkaMetricsMessager{
config: config,
marshaler: marshaler,
}, nil
})
}
type kafkaMetricsMessager struct {
config Config
marshaler marshaler.MetricsMarshaler
}
func (e *kafkaMetricsMessager) marshalData(md pmetric.Metrics) ([]marshaler.Message, error) {
return e.marshaler.MarshalMetrics(md)
}
func (e *kafkaMetricsMessager) getTopic(ctx context.Context, md pmetric.Metrics) string {
return getTopic(ctx, &e.config, md.ResourceMetrics())
}
func (e *kafkaMetricsMessager) partitionData(md pmetric.Metrics) iter.Seq2[[]byte, pmetric.Metrics] {
return func(yield func([]byte, pmetric.Metrics) bool) {
if !e.config.PartitionMetricsByResourceAttributes {
yield(nil, md)
return
}
for _, resourceMetrics := range md.ResourceMetrics().All() {
hash := pdatautil.MapHash(resourceMetrics.Resource().Attributes())
newMetrics := pmetric.NewMetrics()
resourceMetrics.CopyTo(newMetrics.ResourceMetrics().AppendEmpty())
if !yield(hash[:], newMetrics) {
return
}
}
}
}
type resourceSlice[T any] interface {
Len() int
At(int) T
}
type resource interface {
Resource() pcommon.Resource
}
func getTopic[T resource](ctx context.Context, cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute != "" {
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
}
}
}
contextTopic, ok := topic.FromContext(ctx)
if ok {
return contextTopic
}
return cfg.Topic
}
func makeSaramaMessages(messages []marshaler.Message, topic string) []*sarama.ProducerMessage {
saramaMessages := make([]*sarama.ProducerMessage, len(messages))
for i, message := range messages {
saramaMessages[i] = &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
}
}
return saramaMessages
}
func messagesWithHeaders(msg []*sarama.ProducerMessage, h []sarama.RecordHeader) {
if len(h) == 0 || len(msg) == 0 {
return
}
for i := range msg {
if len(msg[i].Headers) == 0 {
msg[i].Headers = h
continue
}
msg[i].Headers = append(msg[i].Headers, h...)
}
}
func metadataToHeaders(ctx context.Context, keys []string) []sarama.RecordHeader {
if len(keys) == 0 {
return nil
}
info := client.FromContext(ctx)
headers := make([]sarama.RecordHeader, 0, len(keys))
for _, key := range keys {
valueSlice := info.Metadata.Get(key)
for _, v := range valueSlice {
headers = append(headers, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(v),
})
}
}
return headers
}