libbeat/outputs/kafka/config.go (254 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"errors"
"fmt"
"math"
"math/rand/v2"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/sarama"
)
type backoffConfig struct {
Init time.Duration `config:"init"`
Max time.Duration `config:"max"`
}
type header struct {
Key string `config:"key"`
Value string `config:"value"`
}
type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Kerberos *kerberos.Config `config:"kerberos"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*config.C `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Headers []header `config:"headers"`
Backoff backoffConfig `config:"backoff"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
Queue config.Namespace `config:"queue"`
// Currently only used for validation. Those values are later
// unpacked into temporary structs whenever they're necessary.
Topic string `config:"topic"`
Topics []any `config:"topics"`
}
type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Full bool `config:"full"`
}
type metaRetryConfig struct {
Max int `config:"max" validate:"min=0"`
Backoff time.Duration `config:"backoff" validate:"min=0"`
}
var compressionModes = map[string]sarama.CompressionCodec{
// As of sarama 1.24.1, zstd support is broken
// (https://github.com/Shopify/sarama/issues/1252), which needs to be
// addressed before we add support here.
// (https://github.com/IBM/sarama/pull/1574) sarama version 1.26.0 has
// fixed this issue and elastic version of sarama has merged this commit.
// (https://github.com/elastic/sarama/commit/37faed7ffc7d59e681d99cfebd1f3d453d6d607c)
"none": sarama.CompressionNone,
"no": sarama.CompressionNone,
"off": sarama.CompressionNone,
"gzip": sarama.CompressionGZIP,
"lz4": sarama.CompressionLZ4,
"snappy": sarama.CompressionSnappy,
"zstd": sarama.CompressionZSTD,
}
func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
TLS: nil,
Kerberos: nil,
Timeout: 30 * time.Second,
BulkMaxSize: 2048,
BulkFlushFrequency: 0,
Metadata: metaConfig{
Retry: metaRetryConfig{
Max: 3,
Backoff: 250 * time.Millisecond,
},
RefreshFreq: 10 * time.Minute,
Full: false,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: kafka.Version("2.1.0"),
MaxRetries: 3,
Headers: nil,
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
}
func readConfig(cfg *config.C) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
return &c, nil
}
func (c *kafkaConfig) Validate() error {
if len(c.Hosts) == 0 {
return errors.New("no hosts configured")
}
if _, ok := compressionModes[strings.ToLower(c.Compression)]; !ok {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}
if err := c.Version.Validate(); err != nil {
return err
}
if c.Username != "" && c.Password == "" {
return fmt.Errorf("password must be set when username is configured")
}
if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}
if c.Topic == "" && len(c.Topics) == 0 {
return errors.New("either 'topic' or 'topics' must be defined")
}
// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
if management.UnderAgent() {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}
}
return nil
}
func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, error) {
partitioner, err := makePartitioner(log, config.Partition)
if err != nil {
return nil, err
}
k := sarama.NewConfig()
// configure network level properties
timeout := config.Timeout
k.Net.DialTimeout = timeout
k.Net.ReadTimeout = timeout
k.Net.WriteTimeout = timeout
k.Net.KeepAlive = config.KeepAlive
k.Producer.Timeout = config.BrokerTimeout
k.Producer.CompressionLevel = config.CompressionLevel
tls, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
}
if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleClientConfig("")
}
switch {
case config.Kerberos.IsEnabled():
cfgwarn.Beta("Kerberos authentication for Kafka is beta.")
// Due to a regrettable past decision, the flag controlling Kerberos
// FAST authentication was initially added to the output configuration
// rather than the shared Kerberos configuration. To avoid a breaking
// change, we still check for the old flag, but it is deprecated and
// should be removed in a future version.
enableFAST := config.Kerberos.EnableFAST || config.EnableFAST
k.Net.SASL.Enable = true
k.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Username: config.Kerberos.Username,
Password: config.Kerberos.Password,
Realm: config.Kerberos.Realm,
DisablePAFXFAST: !enableFAST,
}
case config.Username != "":
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
config.Sasl.ConfigureSarama(k)
}
// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq
k.Metadata.Full = config.Metadata.Full
// configure producer API properties
if config.MaxMessageBytes != nil {
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)
}
compressionMode, ok := compressionModes[strings.ToLower(config.Compression)]
if !ok {
return nil, fmt.Errorf("Unknown compression mode: '%v'", config.Compression)
}
k.Producer.Compression = compressionMode
k.Producer.Return.Successes = true // enable return channel for signaling
k.Producer.Return.Errors = true
// have retries being handled by libbeat, disable retries in sarama library
retryMax := config.MaxRetries
if retryMax < 0 {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff)
// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
// configure bulk size
k.Producer.Flush.MaxMessages = config.BulkMaxSize
if config.BulkFlushFrequency > 0 {
k.Producer.Flush.Frequency = config.BulkFlushFrequency
}
// configure client ID
k.ClientID = config.ClientID
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
k.Version = version
k.Producer.Partitioner = partitioner
k.MetricRegistry = adapter.GetGoMetrics(
monitoring.Default,
"libbeat.outputs",
adapter.Rename("incoming-byte-rate", "read.bytes"),
adapter.Rename("outgoing-byte-rate", "write.bytes"),
adapter.Rename("request-latency-in-ms", "write.latency"),
adapter.Rename("requests-in-flight", "kafka.requests-in-flight"),
adapter.GoMetricsNilify,
)
if err := k.Validate(); err != nil {
log.Errorf("Invalid kafka configuration: %+v", err)
return nil, err
}
return k, nil
}
// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually
// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff.
func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration {
maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init))))
return func(retries, _ int) time.Duration {
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))
}
// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter := rand.Int64N(limit + 1)
return time.Duration(limit + jitter)
}
}