internal/kafka/configkafka/config.go (198 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package configkafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
import (
"errors"
"fmt"
"time"
"github.com/IBM/sarama"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap"
)
const (
LatestOffset = "latest"
EarliestOffset = "earliest"
)
type ClientConfig struct {
// Brokers holds the list of Kafka bootstrap servers (default localhost:9092).
Brokers []string `mapstructure:"brokers"`
// ResolveCanonicalBootstrapServersOnly configures the Kafka client to perform
// a DNS lookup on each of the provided brokers, and then perform a reverse
// lookup on the resulting IPs to obtain the canonical hostnames to use as the
// bootstrap servers. This can be required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// ProtocolVersion defines the Kafka protocol version that the client will
// assume it is running against.
ProtocolVersion string `mapstructure:"protocol_version"`
// ClientID holds the client ID advertised to Kafka, which can be used for
// enforcing ACLs, throttling quotas, and more (default "otel-collector")
ClientID string `mapstructure:"client_id"`
// Authentication holds Kafka authentication details.
Authentication AuthenticationConfig `mapstructure:"auth"`
// TLS holds TLS-related configuration for connecting to Kafka brokers.
//
// By default the client will use an insecure connection unless
// SASL/AWS_MSK_IAM_OAUTHBEARER auth is configured.
TLS *configtls.ClientConfig `mapstructure:"tls"`
// Metadata holds metadata-related configuration for producers and consumers.
Metadata MetadataConfig `mapstructure:"metadata"`
}
func NewDefaultClientConfig() ClientConfig {
return ClientConfig{
Brokers: []string{"localhost:9092"},
ClientID: "otel-collector",
Metadata: NewDefaultMetadataConfig(),
}
}
func (c ClientConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("brokers must be specified")
}
if c.ProtocolVersion != "" {
if _, err := sarama.ParseKafkaVersion(c.ProtocolVersion); err != nil {
return fmt.Errorf("invalid protocol version: %w", err)
}
}
return nil
}
type ConsumerConfig struct {
// SessionTimeout controls the Kafka consumer group session timeout.
// The session timeout is used to detect the consumer's liveness.
SessionTimeout time.Duration `mapstructure:"session_timeout"`
// HeartbeatInterval controls the Kafka consumer group coordination
// heartbeat interval. Heartbeats ensure the consumer's session remains
// active.
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
// GroupID specifies the ID of the consumer group that will be
// consuming messages from (default "otel-collector").
GroupID string `mapstructure:"group_id"`
// InitialOffset specifies the initial offset to use if no offset was
// previously committed. Must be `latest` or `earliest` (default "latest").
InitialOffset string `mapstructure:"initial_offset"`
// AutoCommit controls the auto-commit functionality of the consumer.
AutoCommit AutoCommitConfig `mapstructure:"autocommit"`
// The minimum bytes per fetch from Kafka (default "1")
MinFetchSize int32 `mapstructure:"min_fetch_size"`
// The default bytes per fetch from Kafka (default "1048576")
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
}
func NewDefaultConsumerConfig() ConsumerConfig {
return ConsumerConfig{
SessionTimeout: 10 * time.Second,
HeartbeatInterval: 3 * time.Second,
GroupID: "otel-collector",
InitialOffset: LatestOffset,
AutoCommit: AutoCommitConfig{
Enable: true,
Interval: time.Second,
},
MinFetchSize: 1,
MaxFetchSize: 0,
DefaultFetchSize: 1048576,
}
}
func (c ConsumerConfig) Validate() error {
switch c.InitialOffset {
case LatestOffset, EarliestOffset:
// Valid
default:
return fmt.Errorf(
"initial_offset should be one of 'latest' or 'earliest'. configured value %v",
c.InitialOffset,
)
}
return nil
}
type AutoCommitConfig struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool `mapstructure:"enable"`
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration `mapstructure:"interval"`
}
type ProducerConfig struct {
// Maximum message bytes the producer will accept to produce (default 1000000)
MaxMessageBytes int `mapstructure:"max_message_bytes"`
// RequiredAcks holds the number acknowledgements required before producing
// returns successfully. See:
// https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
//
// Acceptable values are:
// 0 (NoResponse) Does not wait for any acknowledgements.
// 1 (WaitForLocal) Waits for only the leader to write the record to its local log,
// but does not wait for followers to acknowledge. (default)
// -1 (WaitForAll) Waits for all in-sync replicas to acknowledge.
// In YAML configuration, "all" is accepted as an alias for -1.
RequiredAcks RequiredAcks `mapstructure:"required_acks"`
// Compression Codec used to produce messages
// https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec
// The options are: 'none' (default), 'gzip', 'snappy', 'lz4', and 'zstd'
Compression string `mapstructure:"compression"`
// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
FlushMaxMessages int `mapstructure:"flush_max_messages"`
}
func NewDefaultProducerConfig() ProducerConfig {
return ProducerConfig{
MaxMessageBytes: 1000000,
RequiredAcks: WaitForLocal,
Compression: "none",
FlushMaxMessages: 0,
}
}
func (c ProducerConfig) Validate() error {
switch c.Compression {
case "none", "gzip", "snappy", "lz4", "zstd":
// Valid compression
default:
return fmt.Errorf(
"compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value is %q",
c.Compression,
)
}
return nil
}
// Unmarshal unmarshals into ProducerConfig, allowing the user to specify any of ["all", -1, 0, 1]
// for required_acks. This is in line with standard Kafka producer configuration as described at
// https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
//
// Note that confmap.Unmarshaler may only be implemented by structs, so we cannot define this method
// on RequiredAcks itself.
func (c *ProducerConfig) Unmarshal(conf *confmap.Conf) error {
if conf.Get("required_acks") == "all" {
if err := conf.Merge(confmap.NewFromStringMap(
map[string]any{"required_acks": WaitForAll},
)); err != nil {
return err
}
}
return conf.Unmarshal(c)
}
// RequiredAcks defines record acknowledgement behavior for for producers.
type RequiredAcks int
const (
// NoResponse doesn't send any response, the TCP ACK is all you get.
NoResponse RequiredAcks = 0
// WaitForLocal waits for only the local commit to succeed before responding.
WaitForLocal RequiredAcks = 1
// WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
WaitForAll RequiredAcks = -1
)
func (r RequiredAcks) Validate() error {
if r < -1 || r > 1 {
return fmt.Errorf("expected 'all' (-1), 0, or 1; configured value is %v", r)
}
return nil
}
type MetadataConfig struct {
// Whether to maintain a full set of metadata for all topics, or just
// the minimal set that has been necessary so far. The full set is simpler
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool `mapstructure:"full"`
// RefreshInterval controls the frequency at which cluster metadata is
// refreshed. Defaults to 10 minutes.
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
// Retry configuration for metadata.
// This configuration is useful to avoid race conditions when broker
// is starting at the same time as collector.
Retry MetadataRetryConfig `mapstructure:"retry"`
}
// MetadataRetryConfig defines retry configuration for Metadata.
type MetadataRetryConfig struct {
// The total number of times to retry a metadata request when the
// cluster is in the middle of a leader election or at startup (default 3).
Max int `mapstructure:"max"`
// How long to wait for leader election to occur before retrying
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
Backoff time.Duration `mapstructure:"backoff"`
}
func NewDefaultMetadataConfig() MetadataConfig {
return MetadataConfig{
Full: true,
RefreshInterval: 10 * time.Minute,
Retry: MetadataRetryConfig{
Max: 3,
Backoff: time.Millisecond * 250,
},
}
}
// AuthenticationConfig defines authentication-related configuration.
type AuthenticationConfig struct {
// PlainText is an alias for SASL/PLAIN authentication.
//
// Deprecated [v0.123.0]: use SASL with Mechanism set to PLAIN instead.
PlainText *PlainTextConfig `mapstructure:"plain_text"`
// SASL holds SASL authentication configuration.
SASL *SASLConfig `mapstructure:"sasl"`
// Kerberos holds Kerberos authentication configuration.
Kerberos *KerberosConfig `mapstructure:"kerberos"`
// TLS holds TLS configuration for connecting to Kafka brokers.
//
// Deprecated [v0.124.0]: use ClientConfig.TLS instead. This will
// be used only if ClientConfig.TLS is not set.
TLS *configtls.ClientConfig `mapstructure:"tls"`
}
// PlainTextConfig defines plaintext authentication.
type PlainTextConfig struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
// SASLConfig defines the configuration for the SASL authentication.
type SASLConfig struct {
// Username to be used on authentication
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
// SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
Version int `mapstructure:"version"`
// AWSMSK holds configuration specific to AWS MSK.
AWSMSK AWSMSKConfig `mapstructure:"aws_msk"`
}
func (c SASLConfig) Validate() error {
switch c.Mechanism {
case "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER":
// TODO validate c.AWSMSK
case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
if c.Username == "" {
return fmt.Errorf("username is required")
}
if c.Password == "" {
return fmt.Errorf("password is required")
}
default:
return fmt.Errorf(
"mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v",
c.Mechanism,
)
}
if c.Version < 0 || c.Version > 1 {
return fmt.Errorf("version has to be either 0 or 1. configured value %v", c.Version)
}
return nil
}
// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
}
// KerberosConfig defines kerberos configuration.
type KerberosConfig struct {
ServiceName string `mapstructure:"service_name"`
Realm string `mapstructure:"realm"`
UseKeyTab bool `mapstructure:"use_keytab"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
ConfigPath string `mapstructure:"config_file"`
KeyTabPath string `mapstructure:"keytab_file"`
DisablePAFXFAST bool `mapstructure:"disable_fast_negotiation"`
}