plugins/input/kafka/input_kafka.go (253 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package kafka import ( ctx "context" "fmt" "strings" "sync" "time" "github.com/IBM/sarama" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/pipeline/extensions" "github.com/alibaba/ilogtail/pkg/protocol/decoder" "github.com/alibaba/ilogtail/pkg/protocol/decoder/common" ) const ( v1 = iota v2 ) type InputKafka struct { ConsumerGroup string ClientID string Topics []string Brokers []string MaxMessageLen int Version string Offset string SASLUsername string SASLPassword string // Assignor Consumer group partition assignment strategy (range, roundrobin, sticky) Assignor string // Decoder the decoder to use, default is "ext_default_decoder" Decoder string Format string FieldsExtend bool DisableUncompress bool ready chan bool readyCloser sync.Once consumerGroupClient sarama.ConsumerGroup wg *sync.WaitGroup messages chan *sarama.ConsumerMessage context pipeline.Context cancelConsumer ctx.CancelFunc collectorV2 pipeline.PipelineCollector decoder extensions.Decoder collectorV1 pipeline.Collector version int8 } const ( pluginType = "service_kafka" ) func (k *InputKafka) Init(context pipeline.Context) (int, error) { k.context = context if len(k.Brokers) == 0 { return 0, fmt.Errorf("must specify Brokers for plugin %v", pluginType) } if len(k.Topics) == 0 { return 0, fmt.Errorf("must specify Topics for plugin %v", pluginType) } if k.ConsumerGroup == "" { return 0, fmt.Errorf("must specify ConsumerGroup for plugin %v", pluginType) } if k.ClientID == "" { return 0, fmt.Errorf("must specify ClientID for plugin %v", pluginType) } // init decoder if k.Format == "" { k.Format = common.ProtocolRaw } var err error if k.decoder, err = decoder.GetDecoderWithOptions(k.Format, decoder.Option{ FieldsExtend: k.FieldsExtend, DisableUncompress: k.DisableUncompress, }); err != nil { return 0, err } config := sarama.NewConfig() if k.Version != "" { if config.Version, err = sarama.ParseKafkaVersion(k.Version); err != nil { return 0, err } } config.Consumer.Return.Errors = true if k.SASLUsername != "" && k.SASLPassword != "" { logger.Infof(k.context.GetRuntimeContext(), "Using SASL auth with username '%s',", k.SASLUsername) config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword config.Net.SASL.Enable = true } switch strings.ToLower(k.Offset) { case "oldest", "": config.Consumer.Offsets.Initial = sarama.OffsetOldest case "newest": config.Consumer.Offsets.Initial = sarama.OffsetNewest default: logger.Warningf(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "Kafka consumer invalid offset '%s', using 'oldest'", k.Offset) config.Consumer.Offsets.Initial = sarama.OffsetOldest } switch strings.ToLower(k.Assignor) { case "sticky": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} case "roundrobin": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} case "range": config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} default: logger.Warningf(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "Unrecognized consumer group partition assignor '%s', using 'oldest'", k.Assignor) config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} } newClient, err := sarama.NewClient(k.Brokers, config) if err != nil { logger.Warningf(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "Kafka consumer invalid offset '%s', using 'oldest'", k.Offset) return 0, err } consumerGroup, err := sarama.NewConsumerGroupFromClient(k.ConsumerGroup, newClient) if err != nil { logger.Warningf(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "failed to creating consumer group client, [group]%s", k.ConsumerGroup) return 0, err } k.consumerGroupClient = consumerGroup cancelCtx, cancel := ctx.WithCancel(k.context.GetRuntimeContext()) k.cancelConsumer = cancel k.wg = &sync.WaitGroup{} k.wg.Add(1) go func() { defer k.wg.Done() for { if err := k.consumerGroupClient.Consume(cancelCtx, k.Topics, k); err != nil { // Keep retrying Consume when error occurs. // This loop will only exit when the context is canceled (i.e., when loongcollector process is stopping) logger.Error(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "Error from kafka consumer", err) // Add a retry delay to avoid busy loop. select { case <-time.After(time.Second * 5): case <-cancelCtx.Done(): logger.Info(k.context.GetRuntimeContext(), "Consumer was canceled. Leaving consumer group") return } } // check if context was canceled, signaling that the consumer should stop if cancelCtx.Err() != nil { logger.Info(k.context.GetRuntimeContext(), "Consumer was canceled. Leaving consumer group") return } k.ready = make(chan bool) } }() <-k.ready return 0, nil } func (k *InputKafka) Description() string { return "Kafka input for logtail" } func (k *InputKafka) Start(collector pipeline.Collector) error { k.collectorV1 = collector k.version = v1 go func() { k.receiver() }() return nil } func (k *InputKafka) StartService(context pipeline.PipelineContext) error { k.collectorV2 = context.Collector() k.version = v2 go func() { k.receiver() }() return nil } func (k *InputKafka) receiver() { for msg := range k.messages { k.onMessage(msg) } } // Setup implements ConsumerGroupHandler func (k *InputKafka) Setup(session sarama.ConsumerGroupSession) error { k.readyCloser.Do(func() { close(k.ready) }) return nil } // Cleanup implements ConsumerGroupHandler func (k *InputKafka) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim implements ConsumerGroupHandler, must start a consumer loop of ConsumerGroupClaim's Messages(). func (k *InputKafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { logger.Debug(k.context.GetRuntimeContext(), "Consuming messages [partition]", claim.Partition(), "[topic]", claim.Topic(), "init [offset]", claim.InitialOffset()) for { select { case msg, ok := <-claim.Messages(): if !ok { return nil } k.messages <- msg session.MarkMessage(msg, "") case <-session.Context().Done(): logger.Debug(k.context.GetRuntimeContext(), "Ctx was canceled, stopping consumerGroup") return nil } } } func (k *InputKafka) onMessage(msg *sarama.ConsumerMessage) { if msg != nil { switch k.version { case v1: fields := make(map[string]string) if len(msg.Key) == 0 { fields[models.ContentKey] = string(msg.Value) } else { fields[string(msg.Key)] = string(msg.Value) } k.collectorV1.AddData(nil, fields) case v2: data, err := k.decoder.DecodeV2(msg.Value, nil) if err != nil { logger.Warning(k.context.GetRuntimeContext(), "DECODE_MESSAGE_FAIL_ALARM", "decode message failed", err) return } k.collectorV2.CollectList(data...) } } } func (k *InputKafka) Stop() error { k.readyCloser.Do(func() { close(k.ready) }) k.cancelConsumer() k.wg.Wait() err := k.consumerGroupClient.Close() if err != nil { e := fmt.Errorf("[inputs.kafka_consumer] Error closing consumer: %v", err) logger.Errorf(k.context.GetRuntimeContext(), "INPUT_KAFKA_ALARM", "%v", e) return e } return nil } func (k *InputKafka) Collect(collector pipeline.Collector) error { return nil } func init() { pipeline.ServiceInputs[pluginType] = func() pipeline.ServiceInput { return &InputKafka{ ConsumerGroup: "", ClientID: "", Topics: nil, Brokers: nil, Version: "", Offset: "oldest", SASLUsername: "", SASLPassword: "", Assignor: "range", messages: make(chan *sarama.ConsumerMessage, 256), ready: make(chan bool), } } }