filebeat/input/kafka/input.go (358 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/elastic/elastic-agent-libs/mapstr"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/parser"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/sarama"
)
const pluginName = "kafka"
// Plugin creates a new filestream input plugin for creating a stateful input.
func Plugin() input.Plugin {
return input.Plugin{
Name: pluginName,
Stability: feature.Stable,
Deprecated: false,
Info: "Kafka input",
Doc: "The Kafka input consumes events from topics by connecting to the configured kafka brokers",
Manager: input.ConfigureWith(configure),
}
}
func configure(cfg *conf.C) (input.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
saramaConfig, err := newSaramaConfig(config)
if err != nil {
return nil, fmt.Errorf("initializing Sarama config: %w", err)
}
return NewInput(config, saramaConfig)
}
func NewInput(config kafkaInputConfig, saramaConfig *sarama.Config) (*kafkaInput, error) {
return &kafkaInput{config: config, saramaConfig: saramaConfig}, nil
}
type kafkaInput struct {
config kafkaInputConfig
saramaConfig *sarama.Config
saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active
}
func (input *kafkaInput) Name() string { return pluginName }
func (input *kafkaInput) Test(ctx input.TestContext) error {
client, err := sarama.NewClient(input.config.Hosts, input.saramaConfig)
if err != nil {
ctx.Logger.Error(err)
}
topics, err := client.Topics()
if err != nil {
ctx.Logger.Error(err)
}
var missingTopics []string
for _, neededTopic := range input.config.Topics {
if !contains(topics, neededTopic) {
missingTopics = append(missingTopics, neededTopic)
}
}
if len(missingTopics) > 0 {
return fmt.Errorf("Of configured topics %v, topics: %v are not in available topics %v", input.config.Topics, missingTopics, topics)
}
return nil
}
func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
log := ctx.Logger.Named("kafka input").With("hosts", input.config.Hosts)
client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.ackHandler()
}
}
}),
),
WaitClose: input.config.WaitClose,
})
if err != nil {
return err
}
defer client.Close()
log.Info("Starting Kafka input")
defer log.Info("Kafka input stopped")
// Sarama uses standard go contexts to control cancellation, so we need
// to wrap our input context channel in that interface.
goContext := doneChannelContext(ctx)
// If the consumer fails to connect, we use exponential backoff with
// jitter up to 8 * the initial backoff interval.
connectDelay := backoff.NewEqualJitterBackoff(
ctx.Cancelation.Done(),
input.config.ConnectBackoff,
8*input.config.ConnectBackoff,
)
for goContext.Err() == nil {
// Connect to Kafka with a new consumer group.
consumerGroup, err := sarama.NewConsumerGroup(
input.config.Hosts,
input.config.GroupID,
input.saramaConfig,
)
if err != nil {
log.Errorw("Error initializing kafka consumer group", "error", err)
connectDelay.Wait()
continue
}
// We've successfully connected, reset the backoff timer.
connectDelay.Reset()
// We have a connected consumer group now, try to start the main event
// loop by calling Consume (which starts an asynchronous consumer).
// In an ideal run, this function never returns until shutdown; if it
// does, it means the errors have been logged and the consumer group
// has been closed, so we try creating a new one in the next iteration.
input.runConsumerGroup(log, client, goContext, consumerGroup)
}
if errors.Is(ctx.Cancelation.Err(), context.Canceled) {
return nil
} else {
return ctx.Cancelation.Err()
}
}
// Stop doesn't need to do anything because the kafka consumer group and the
// input's outlet both have a context based on input.context.Done and will
// shut themselves down, since the done channel is already closed as part of
// the shutdown process in Runner.Stop().
func (input *kafkaInput) Stop() {
}
// Wait should shut down the input and wait for it to complete, however (see
// Stop above) we don't need to take actions to shut down as long as the
// input.config.Done channel is closed, so we just make a (currently no-op)
// call to Stop() and then wait for sarama to signal completion.
func (input *kafkaInput) Wait() {
input.Stop()
// Wait for sarama to shut down
input.saramaWaitGroup.Wait()
}
func (input *kafkaInput) runConsumerGroup(log *logp.Logger, client beat.Client, context context.Context, consumerGroup sarama.ConsumerGroup) {
handler := &groupHandler{
version: input.config.Version,
client: client,
parsers: input.config.Parsers,
// expandEventListFromField will be assigned the configuration option expand_event_list_from_field
expandEventListFromField: input.config.ExpandEventListFromField,
log: log,
}
input.saramaWaitGroup.Add(1)
defer func() {
consumerGroup.Close()
input.saramaWaitGroup.Done()
}()
// Listen asynchronously to any errors during the consume process
go func() {
for err := range consumerGroup.Errors() {
log.Errorw("Error reading from kafka", "error", err)
}
}()
err := consumerGroup.Consume(context, input.config.Topics, handler)
if err != nil {
log.Errorw("Kafka consume error", "error", err)
}
}
// The metadata attached to incoming events, so they can be ACKed once they've
// been successfully sent.
type eventMeta struct {
ackHandler func()
}
func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string {
array := []string{}
for _, header := range headers {
// Rather than indexing headers in the same object structure Kafka does
// (which would give maximal fidelity, but would be effectively unsearchable
// in elasticsearch and kibana) we compromise by serializing them all as
// strings in the form "<key>: <value>". For this we need to mask
// occurrences of ":" in the original key, which we expect to be uncommon.
// We may consider another approach in the future when it's more clear what
// the most common use cases are.
key := strings.ReplaceAll(string(header.Key), ":", "_")
value := string(header.Value)
array = append(array, fmt.Sprintf("%s: %s", key, value))
}
return array
}
// A barebones implementation of context.Context wrapped around the done
// channels that are more common in the beats codebase.
// TODO(faec): Generalize this to a common utility in a shared library
// (https://github.com/elastic/beats/issues/13125).
type channelCtx struct {
ctx input.Context
}
func doneChannelContext(ctx input.Context) context.Context {
return channelCtx{ctx}
}
func (c channelCtx) Deadline() (deadline time.Time, ok bool) {
//nolint:nakedret // omitting the return gives a build error
return
}
func (c channelCtx) Done() <-chan struct{} {
return c.ctx.Cancelation.Done()
}
func (c channelCtx) Err() error {
return c.ctx.Cancelation.Err()
}
func (c channelCtx) Value(_ interface{}) interface{} { return nil }
// The group handler for the sarama consumer group interface. In addition to
// providing the basic consumption callbacks needed by sarama, groupHandler is
// also currently responsible for marshalling kafka messages into beat.Event,
// and passing ACKs from the output channel back to the kafka cluster.
type groupHandler struct {
sync.Mutex
version kafka.Version
session sarama.ConsumerGroupSession
client beat.Client
parsers parser.Config
// if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned
// ex. in this case are the azure fielsets where the events are found under the json object "records"
expandEventListFromField string // TODO
log *logp.Logger
}
func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
h.Lock()
h.session = session
h.Unlock()
return nil
}
func (h *groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
h.Lock()
h.session = nil
h.Unlock()
return nil
}
// ack informs the kafka cluster that this message has been consumed. Called
// from the input's ACKEvents handler.
func (h *groupHandler) ack(message *sarama.ConsumerMessage) {
h.Lock()
defer h.Unlock()
if h.session != nil {
h.session.MarkMessage(message, "")
}
}
func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
reader := h.createReader(claim)
parser := h.parsers.Create(reader)
for h.session.Context().Err() == nil {
message, err := parser.Next()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
h.client.Publish(beat.Event{
Timestamp: message.Ts,
Meta: message.Meta,
Fields: message.Fields,
Private: message.Private,
})
}
return nil
}
func (h *groupHandler) createReader(claim sarama.ConsumerGroupClaim) reader.Reader {
if h.expandEventListFromField != "" {
return &listFromFieldReader{
claim: claim,
groupHandler: h,
field: h.expandEventListFromField,
log: h.log,
}
}
return &recordReader{
claim: claim,
groupHandler: h,
log: h.log,
}
}
type recordReader struct {
claim sarama.ConsumerGroupClaim
groupHandler *groupHandler
log *logp.Logger
}
func (m *recordReader) Close() error {
return nil
}
func (m *recordReader) Next() (reader.Message, error) {
msg, ok := <-m.claim.Messages()
if !ok {
return reader.Message{}, io.EOF
}
timestamp, kafkaFields := composeEventMetadata(m.claim, m.groupHandler, msg)
ackHandler := func() {
m.groupHandler.ack(msg)
}
return composeMessage(timestamp, msg.Value, kafkaFields, ackHandler), nil
}
type listFromFieldReader struct {
claim sarama.ConsumerGroupClaim
groupHandler *groupHandler
buffer []reader.Message
field string
log *logp.Logger
}
func (l *listFromFieldReader) Close() error {
return nil
}
func (l *listFromFieldReader) Next() (reader.Message, error) {
if len(l.buffer) != 0 {
return l.returnFromBuffer()
}
msg, ok := <-l.claim.Messages()
if !ok {
return reader.Message{}, io.EOF
}
timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg)
messages := l.parseMultipleMessages(msg.Value)
neededAcks := atomic.Int64{}
neededAcks.Add(int64(len(messages)))
ackHandler := func() {
if neededAcks.Add(-1) == 0 {
l.groupHandler.ack(msg)
}
}
for _, message := range messages {
newBuffer := append(l.buffer, composeMessage(timestamp, []byte(message), kafkaFields, ackHandler))
l.buffer = newBuffer
}
return l.returnFromBuffer()
}
func (l *listFromFieldReader) returnFromBuffer() (reader.Message, error) {
next := l.buffer[0]
newBuffer := l.buffer[1:]
l.buffer = newBuffer
return next, nil
}
// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (l *listFromFieldReader) parseMultipleMessages(bMessage []byte) []string {
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
l.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", l.field), "error", err)
return []string{}
}
var messages []string
for _, ms := range obj[l.field] {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
} else {
l.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
}
}
return messages
}
func composeEventMetadata(claim sarama.ConsumerGroupClaim, handler *groupHandler, msg *sarama.ConsumerMessage) (time.Time, mapstr.M) {
timestamp := time.Now()
kafkaFields := mapstr.M{
"topic": claim.Topic(),
"partition": claim.Partition(),
"offset": msg.Offset,
"key": string(msg.Key),
}
version, versionOk := handler.version.Get()
if versionOk && version.IsAtLeast(sarama.V0_10_0_0) {
timestamp = msg.Timestamp
if !msg.BlockTimestamp.IsZero() {
kafkaFields["block_timestamp"] = msg.BlockTimestamp
}
}
if versionOk && version.IsAtLeast(sarama.V0_11_0_0) {
kafkaFields["headers"] = arrayForKafkaHeaders(msg.Headers)
}
return timestamp, kafkaFields
}
func composeMessage(timestamp time.Time, content []byte, kafkaFields mapstr.M, ackHandler func()) reader.Message {
return reader.Message{
Ts: timestamp,
Content: content,
Fields: mapstr.M{
"kafka": kafkaFields,
"message": string(content),
},
Private: eventMeta{
ackHandler: ackHandler,
},
}
}
func contains(elements []string, element string) bool {
for _, e := range elements {
if e == element {
return true
}
}
return false
}