plugins/flusher/kafkav2/flusher_kafka_v2.go (415 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafkav2
import (
cryptorand "crypto/rand"
"errors"
"fmt"
"math"
"math/big"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/alibaba/ilogtail/pkg/fmtstr"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
converter "github.com/alibaba/ilogtail/pkg/protocol/converter"
"github.com/alibaba/ilogtail/pkg/util"
)
const (
PartitionerTypeRandom = "random"
PartitionerTypeRoundRobin = "roundrobin"
PartitionerTypeRoundHash = "hash"
)
type FlusherKafka struct {
context pipeline.Context
converter *converter.Converter
// The list of kafka brokers
Brokers []string
// The name of the kafka topic
Topic string
// Kafka protocol version
Version Version
// The number of seconds to wait for responses from the Kafka brokers before timing out.
// The default is 30 (seconds).
Timeout time.Duration
// Authentication using SASL/PLAIN
Authentication Authentication
// Kafka output broker event partitioning strategy.
// Must be one of random, roundrobin, or hash. By default, the random partitioner is used
PartitionerType string
// Kafka metadata update settings.
Metadata metaConfig
// The keep-alive period for an active network connection.
// If 0s, keep-alives are disabled. The default is 0 seconds.
KeepAlive time.Duration
// The maximum number of messages the producer will send in a single
MaxMessageBytes *int
// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
// 0 -> NoResponse. doesn't send any response
// 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
// -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
RequiredACKs *int
// The maximum duration a broker will wait for number of required ACKs.
// The default is 10s.
BrokerTimeout time.Duration
// Compression Codec used to produce messages
// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
Compression string
// Sets the compression level used by gzip. Setting this value to 0 disables compression.
// The compression level must be in the range of 1 (best speed) to 9 (best compression)
// The default value is 4.
CompressionLevel int
// How many outstanding requests a connection is allowed to have before
// sending on it blocks (default 5).
MaxOpenRequests int
// The maximum number of events to bulk in a single Kafka request. The default is 2048.
BulkMaxSize int
// Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0.
BulkFlushFrequency time.Duration
MaxRetries int
// A header is a key-value pair, and multiple headers can be included with the same key.
// Only string values are supported
Headers []header
Backoff backoffConfig
// Per Kafka broker number of messages buffered in output pipeline. The default is 256
ChanBufferSize int
// ilogtail data convert config
Convert convertConfig
HashKeys []string
HashOnce bool
ClientID string
// obtain from Topic
topicKeys []string
isTerminal chan bool
producer sarama.AsyncProducer
hashKeyMap map[string]struct{}
hashKey sarama.StringEncoder
defaultHashKey string
selectFields []string
recordHeaders []sarama.RecordHeader
}
type backoffConfig struct {
// The number of seconds to wait before trying to republish to Kafka after a network error.
// After a successful publishing, the backoff timer is reset. The default is 1s.
Init time.Duration
// The maximum number of seconds to wait before attempting to republish to Kafka after a network error.
// The default is 60s.
Max time.Duration
}
type header struct {
Key string
Value string
}
type metaConfig struct {
Retry metaRetryConfig
// Metadata refresh interval. Defaults to 10 minutes.
RefreshFrequency time.Duration
// Strategy to use when fetching metadata,
// when this option is true, the client will maintain a full set of metadata for all the available topics
// if this option is set to false it will only refresh the metadata for the configured topics. The default is false.
Full bool
}
type metaRetryConfig struct {
// The total number of times to retry a metadata request when the
// cluster is in the middle of a leader election or at startup (default 3).
Max int
// How long to wait for leader election to occur before retrying
// default 250ms
Backoff time.Duration
}
type convertConfig struct {
// Rename one or more fields from tags.
TagFieldsRename map[string]string
// Rename one or more fields, The protocol field options can only be: contents, tags, time
ProtocolFieldsRename map[string]string
// Convert protocol, default value: custom_single
Protocol string
// Convert encoding, default value:json
// The options are: 'json'
Encoding string
}
// NewFlusherKafka Kafka flusher default config
func NewFlusherKafka() *FlusherKafka {
return &FlusherKafka{
Brokers: nil,
ClientID: "LogtailPlugin",
PartitionerType: PartitionerTypeRandom,
Timeout: 30 * time.Second,
BulkMaxSize: 2048,
BulkFlushFrequency: 0,
Metadata: metaConfig{
Retry: metaRetryConfig{
Max: 3,
Backoff: 250 * time.Millisecond,
},
RefreshFrequency: 10 * time.Minute,
Full: false,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
MaxOpenRequests: 5,
BrokerTimeout: 10 * time.Second,
Compression: "none",
CompressionLevel: 4,
Version: "1.0.0",
MaxRetries: 3,
Headers: nil,
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
ChanBufferSize: 256,
Authentication: Authentication{
PlainText: &PlainTextConfig{
Username: "",
Password: "",
},
},
Convert: convertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
},
}
}
func (k *FlusherKafka) Init(context pipeline.Context) error {
k.context = context
if k.Brokers == nil || len(k.Brokers) == 0 {
var err = errors.New("brokers ip is nil")
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init kafka flusher fail, error", err)
return err
}
// Set default value while not set
if k.Convert.Encoding == "" {
k.converter.Encoding = converter.EncodingJSON
}
if k.Convert.Protocol == "" {
k.Convert.Protocol = converter.ProtocolCustomSingle
}
convert, err := k.getConverter()
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init kafka flusher converter fail, error", err)
return err
}
k.converter = convert
// Obtain topic keys from dynamic topic expression
topicKeys, err := fmtstr.CompileKeys(k.Topic)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init kafka flusher fail, error", err)
return err
}
k.topicKeys = topicKeys
k.defaultHashKey = context.GetLogstore()
// Init headers
k.recordHeaders = k.makeHeaders()
saramaConfig, err := newSaramaConfig(k)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init kafka flusher fail, error", err)
return err
}
producer, err := sarama.NewAsyncProducer(k.Brokers, saramaConfig)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init kafka flusher fail, error", err)
return err
}
// Merge topicKeys and HashKeys,Only one convert after merge
k.selectFields = util.UniqueStrings(k.topicKeys, k.HashKeys)
SIGTERM := make(chan bool)
go func(p sarama.AsyncProducer, SIGTERM chan bool) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka write data fail, error", err)
}
case <-success:
// Do Nothing
case <-SIGTERM:
return
}
}
}(producer, SIGTERM)
k.producer = producer
k.isTerminal = SIGTERM
return nil
}
func (k *FlusherKafka) Description() string {
return "Kafka flusher for logtail"
}
func (k *FlusherKafka) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error {
for _, logGroup := range logGroupList {
logger.Debug(k.context.GetRuntimeContext(), "[LogGroup] topic", logGroup.Topic, "logstore", logGroup.Category, "logcount", len(logGroup.Logs), "tags", logGroup.LogTags)
logs, values, err := k.converter.ToByteStreamWithSelectedFields(logGroup, k.topicKeys)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka convert log fail, error", err)
}
for index, log := range logs.([][]byte) {
k.flush(log, values[index])
}
}
return nil
}
func (k *FlusherKafka) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error {
for _, groupEvents := range groupEventsArray {
logger.Debug(k.context.GetRuntimeContext(), "[GroupEvents] events count", len(groupEvents.Events),
"tags", groupEvents.Group.GetTags().Iterator())
logs, values, err := k.converter.ToByteStreamWithSelectedFieldsV2(groupEvents, k.topicKeys)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka convert log fail, error", err)
}
for index, log := range logs.([][]byte) {
k.flush(log, values[index])
}
}
return nil
}
func (k *FlusherKafka) flush(log []byte, valueMap map[string]string) {
topic := k.Topic
if len(k.topicKeys) > 0 {
formattedTopic, err := fmtstr.FormatTopic(valueMap, k.Topic)
if err != nil {
logger.Error(k.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka format topic fail, error", err)
} else {
topic = *formattedTopic
}
}
m := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(log),
Headers: k.recordHeaders,
}
// set key when partition type is hash
if k.PartitionerType == PartitionerTypeRoundHash {
if k.HashOnce {
if len(k.hashKey) == 0 {
k.hashKey = k.hashPartitionKey(valueMap, k.defaultHashKey)
}
m.Key = k.hashKey
} else {
m.Key = k.hashPartitionKey(valueMap, k.defaultHashKey)
}
}
k.producer.Input() <- m
}
func (k *FlusherKafka) hashPartitionKey(valueMap map[string]string, defaultKey string) sarama.StringEncoder {
var hashData []string
for key, value := range valueMap {
if _, ok := k.hashKeyMap[key]; ok {
hashData = append(hashData, value)
}
}
if len(hashData) == 0 {
hashData = append(hashData, defaultKey)
}
logger.Debug(k.context.GetRuntimeContext(), "partition key", hashData, " hashKeyMap", k.hashKeyMap)
return sarama.StringEncoder(strings.Join(hashData, "###"))
}
func (*FlusherKafka) SetUrgent(flag bool) {
}
// IsReady is ready to flush
func (k *FlusherKafka) IsReady(projectName string, logstoreName string, logstoreKey int64) bool {
return k.producer != nil
}
// Stop ...
func (k *FlusherKafka) Stop() error {
err := k.producer.Close()
close(k.isTerminal)
return err
}
func newSaramaConfig(config *FlusherKafka) (*sarama.Config, error) {
partitioner, err := makePartitioner(config)
if err != nil {
return nil, err
}
k := sarama.NewConfig()
// configure network level properties
k.Net.MaxOpenRequests = config.MaxOpenRequests
timeout := config.Timeout
k.Net.DialTimeout = timeout
k.Net.ReadTimeout = timeout
k.Net.WriteTimeout = timeout
k.Net.KeepAlive = config.KeepAlive
k.Producer.Timeout = config.BrokerTimeout
k.Producer.CompressionLevel = config.CompressionLevel
// configure Authentication
err = config.Authentication.ConfigureAuthentication(k)
if err != nil {
return nil, err
}
// configure kafka version
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("unknown/unsupported kafka version: %v", config.Version)
}
k.Version = version
// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFrequency
k.Metadata.Full = config.Metadata.Full
// configure producer API properties
if config.MaxMessageBytes != nil {
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)
}
compressionMode, err := saramaProducerCompressionCodec(strings.ToLower(config.Compression))
if err != nil {
return nil, err
}
k.Producer.Compression = compressionMode
k.Producer.Return.Successes = true // enable return channel for signaling
k.Producer.Return.Errors = true
retryMax := config.MaxRetries
if retryMax < 0 {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff)
// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
// configure bulk size
k.Producer.Flush.MaxMessages = config.BulkMaxSize
if config.BulkFlushFrequency > 0 {
k.Producer.Flush.Frequency = config.BulkFlushFrequency
}
// configure client ID
k.ClientID = config.ClientID
k.Producer.Partitioner = partitioner
if err := k.Validate(); err != nil {
logger.Error(config.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "Invalid kafka configuration, error", err)
return nil, err
}
return k, nil
}
// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually
// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff.
func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration {
maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init))))
return func(retries, _ int) time.Duration {
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))
}
// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter, _ := cryptorand.Int(cryptorand.Reader, big.NewInt(limit+1))
return time.Duration(limit + jitter.Int64())
}
}
func (k *FlusherKafka) Validate() error {
if len(k.Brokers) == 0 {
return errors.New("no hosts configured")
}
// check topic
if k.Topic == "" {
return errors.New("topic can't be empty")
}
if _, err := saramaProducerCompressionCodec(strings.ToLower(k.Compression)); err != nil {
return err
}
if err := k.Version.Validate(); err != nil {
return err
}
if k.Compression == "gzip" {
lvl := k.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}
return nil
}
func makePartitioner(k *FlusherKafka) (partitioner sarama.PartitionerConstructor, err error) {
switch k.PartitionerType {
case PartitionerTypeRoundRobin:
partitioner = sarama.NewRoundRobinPartitioner
case PartitionerTypeRoundHash:
partitioner = sarama.NewHashPartitioner
k.hashKeyMap = make(map[string]struct{})
k.hashKey = ""
for _, key := range k.HashKeys {
k.hashKeyMap[key] = struct{}{}
}
case PartitionerTypeRandom:
partitioner = sarama.NewRandomPartitioner
default:
return nil, fmt.Errorf("invalid PartitionerType,configured value %v", k.PartitionerType)
}
return partitioner, nil
}
func saramaProducerCompressionCodec(compression string) (sarama.CompressionCodec, error) {
switch compression {
case "none":
return sarama.CompressionNone, nil
case "gzip":
return sarama.CompressionGZIP, nil
case "snappy":
return sarama.CompressionSnappy, nil
case "lz4":
return sarama.CompressionLZ4, nil
case "zstd":
return sarama.CompressionZSTD, nil
default:
return sarama.CompressionNone, fmt.Errorf("producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value %v", compression)
}
}
func (k *FlusherKafka) makeHeaders() []sarama.RecordHeader {
if len(k.Headers) != 0 {
recordHeaders := make([]sarama.RecordHeader, 0, len(k.Headers))
for _, h := range k.Headers {
if h.Key == "" {
continue
}
recordHeader := sarama.RecordHeader{
Key: []byte(h.Key),
Value: []byte(h.Value),
}
recordHeaders = append(recordHeaders, recordHeader)
}
return recordHeaders
}
return nil
}
func (k *FlusherKafka) getConverter() (*converter.Converter, error) {
logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename)
return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.TagFieldsRename, k.Convert.ProtocolFieldsRename, k.context.GetPipelineScopeConfig())
}
func init() {
pipeline.Flushers["flusher_kafka_v2"] = func() pipeline.Flusher {
return NewFlusherKafka()
}
}