exporter/kafkaexporter/config.go (62 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) var _ component.Config = (*Config)(nil) // Config defines configuration for Kafka exporter. type Config struct { TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` configretry.BackOffConfig `mapstructure:"retry_on_failure"` configkafka.ClientConfig `mapstructure:",squash"` Producer configkafka.ProducerConfig `mapstructure:"producer"` // Logs holds configuration about how logs should be sent to Kafka. Logs SignalConfig `mapstructure:"logs"` // Metrics holds configuration about how metrics should be sent to Kafka. Metrics SignalConfig `mapstructure:"metrics"` // Traces holds configuration about how traces should be sent to Kafka. Traces SignalConfig `mapstructure:"traces"` // Topic holds the name of the Kafka topic to which data should be exported. // // Topic has no default. If explicitly specified, it will take precedence over // the default values of logs::topic, metrics::topic, and traces::topic. // // Deprecated [v0.124.0]: use logs::topic, metrics::topic, and traces::topic instead. Topic string `mapstructure:"topic"` // IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers. IncludeMetadataKeys []string `mapstructure:"include_metadata_keys"` // TopicFromAttribute is the name of the attribute to use as the topic name. TopicFromAttribute string `mapstructure:"topic_from_attribute"` // Encoding holds the encoding of Kafka message values. // // Encoding has no default. If explicitly specified, it will take precedence over // the default values of logs::encoding, metrics::encoding, and traces::encoding. // // Deprecated [v0.124.0]: use logs::encoding, metrics::encoding, and traces::encoding instead. Encoding string `mapstructure:"encoding"` // PartitionTracesByID sets the message key of outgoing trace messages to the trace ID. // // NOTE: this does not have any effect for Jaeger encodings. Jaeger encodings always use // use the trace ID for the message key. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` // PartitionMetricsByResourceAttributes controls the partitioning of metrics messages by // resource. If this is true, then the message key will be set to a hash of the resource's // identifying attributes. PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` // PartitionLogsByResourceAttributes controls the partitioning of logs messages by resource. // If this is true, then the message key will be set to a hash of the resource's identifying // attributes. PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"` } func (c *Config) Unmarshal(conf *confmap.Conf) error { if err := conf.Unmarshal(c); err != nil { return err } // Check if deprecated fields have been explicitly set, // in which case they should be used instead of signal- // specific defaults. var zeroConfig Config if err := conf.Unmarshal(&zeroConfig); err != nil { return err } if c.Topic != "" { if zeroConfig.Logs.Topic == "" { c.Logs.Topic = c.Topic } if zeroConfig.Metrics.Topic == "" { c.Metrics.Topic = c.Topic } if zeroConfig.Traces.Topic == "" { c.Traces.Topic = c.Topic } } if c.Encoding != "" { if zeroConfig.Logs.Encoding == "" { c.Logs.Encoding = c.Encoding } if zeroConfig.Metrics.Encoding == "" { c.Metrics.Encoding = c.Encoding } if zeroConfig.Traces.Encoding == "" { c.Traces.Encoding = c.Encoding } } return conf.Unmarshal(c) } // SignalConfig holds signal-specific configuration for the Kafka exporter. type SignalConfig struct { // Topic holds the name of the Kafka topic to which messages of the // signal type should be produced. // // The default depends on the signal type: // - "otlp_spans" for traces // - "otlp_metrics" for metrics // - "otlp_logs" for logs Topic string `mapstructure:"topic"` // Encoding holds the encoding of messages for the signal type. // // Defaults to "otlp_proto". Encoding string `mapstructure:"encoding"` }