libbeat/outputs/kafka/client.go (311 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/eapache/go-resiliency/breaker"
"github.com/elastic/sarama"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/testing"
"github.com/elastic/elastic-agent-libs/transport"
)
type client struct {
log *logp.Logger
observer outputs.Observer
hosts []string
topic outil.Selector
key *fmtstr.EventFormatString
index string
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}
producer sarama.AsyncProducer
recordHeaders []sarama.RecordHeader
wg sync.WaitGroup
}
type msgRef struct {
client *client
count int32
total int
failed []publisher.Event
batch publisher.Batch
err error
}
var (
errNoTopicsSelected = errors.New("no topic could be selected")
// authErrors are authentication/authorisation errors that will cause
// the event to be dropped
authErrors = []error{
sarama.ErrTopicAuthorizationFailed,
sarama.ErrGroupAuthorizationFailed,
sarama.ErrClusterAuthorizationFailed,
// I believe those are handled before the connection is
// stabilised, however we also handle them here just in
// case
sarama.ErrUnsupportedSASLMechanism,
sarama.ErrIllegalSASLState,
sarama.ErrSASLAuthenticationFailed,
}
)
func newKafkaClient(
observer outputs.Observer,
hosts []string,
index string,
key *fmtstr.EventFormatString,
topic outil.Selector,
headers []header,
writer codec.Codec,
cfg *sarama.Config,
logger *logp.Logger,
) (*client, error) {
c := &client{
log: logger.Named(logSelector),
observer: observer,
hosts: hosts,
topic: topic,
key: key,
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
}
if len(headers) != 0 {
recordHeaders := make([]sarama.RecordHeader, 0, len(headers))
for _, h := range headers {
if h.Key == "" {
continue
}
recordHeader := sarama.RecordHeader{
Key: []byte(h.Key),
Value: []byte(h.Value),
}
recordHeaders = append(recordHeaders, recordHeader)
}
c.recordHeaders = recordHeaders
}
return c, nil
}
func (c *client) Connect(_ context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
c.log.Debugf("connect: %v", c.hosts)
// try to connect
producer, err := sarama.NewAsyncProducer(c.hosts, &c.config)
if err != nil {
c.log.Errorf("Kafka connect fails with: %+v", err)
return err
}
c.producer = producer
c.wg.Add(2)
go c.successWorker(producer.Successes())
go c.errorWorker(producer.Errors())
return nil
}
func (c *client) Close() error {
c.mux.Lock()
defer c.mux.Unlock()
c.log.Debug("closed kafka client")
// producer was not created before the close() was called.
if c.producer == nil {
return nil
}
close(c.done)
c.producer.AsyncClose()
c.wg.Wait()
c.producer = nil
return nil
}
func (c *client) Publish(_ context.Context, batch publisher.Batch) error {
events := batch.Events()
c.observer.NewBatch(len(events))
ref := &msgRef{
client: c,
count: int32(len(events)), //nolint:gosec //keep old behavior
total: len(events),
failed: nil,
batch: batch,
}
ch := c.producer.Input()
for i := range events {
d := &events[i]
msg, err := c.getEventMessage(d)
if err != nil {
c.log.Errorf("Dropping event: %+v", err)
ref.done()
c.observer.PermanentErrors(1)
continue
}
msg.ref = ref
msg.initProducerMessage()
ch <- &msg.msg
}
return nil
}
func (c *client) String() string {
return "kafka(" + strings.Join(c.hosts, ",") + ")"
}
func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
event := &data.Content
msg := &message{partition: -1, data: *data}
value, err := data.Cache.GetValue("partition")
if err == nil {
if c.log.IsDebug() {
c.log.Debugf("got event.Meta[\"partition\"] = %v", value)
}
if partition, ok := value.(int32); ok {
msg.partition = partition
}
}
value, err = data.Cache.GetValue("topic")
if err == nil {
if c.log.IsDebug() {
c.log.Debugf("got event.Meta[\"topic\"] = %v", value)
}
if topic, ok := value.(string); ok {
msg.topic = topic
}
}
if msg.topic == "" {
topic, err := c.topic.Select(event)
if err != nil {
return nil, fmt.Errorf("setting kafka topic failed with %w", err)
}
if topic == "" {
return nil, errNoTopicsSelected
}
msg.topic = topic
if _, err := data.Cache.Put("topic", topic); err != nil {
return nil, fmt.Errorf("setting kafka topic in publisher event failed: %w", err)
}
}
serializedEvent, err := c.codec.Encode(c.index, event)
if err != nil {
if c.log.IsDebug() {
c.log.Debug("failed event logged to event log file")
c.log.Debugw(fmt.Sprintf("failed event: %v", event), logp.TypeKey, logp.EventType)
}
return nil, err
}
buf := make([]byte, len(serializedEvent))
copy(buf, serializedEvent)
msg.value = buf
// message timestamps have been added to kafka with version 0.10.0.0
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
msg.ts = event.Timestamp
}
if c.key != nil {
if key, err := c.key.RunBytes(event); err == nil {
msg.key = key
}
}
return msg, nil
}
func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
defer c.wg.Done()
defer c.log.Debug("Stop kafka ack worker")
for libMsg := range ch {
msg, ok := libMsg.Metadata.(*message)
if !ok {
c.log.Debug("Failed to assert libMsg.Metadata to *message")
return
}
msg.ref.done()
}
}
func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
breakerOpen := false
defer c.wg.Done()
defer c.log.Debug("Stop kafka error handler")
for errMsg := range ch {
msg, ok := errMsg.Msg.Metadata.(*message)
if !ok {
c.log.Debug("Failed to assert libMsg.Metadata to *message")
return
}
msg.ref.fail(msg, errMsg.Err)
if errors.Is(errMsg.Err, breaker.ErrBreakerOpen) {
// ErrBreakerOpen is a very special case in Sarama. It happens only when
// there have been repeated critical (broker / topic-level) errors, and it
// puts Sarama into a state where it immediately rejects all input
// for 10 seconds, ignoring retry / backoff settings.
// With this output's current design (in which Publish passes through to
// Sarama's input channel with no further synchronization), retrying
// these failed values causes an infinite retry loop that degrades
// the entire system.
// "Nice" approaches and why we haven't used them:
// - Use exposed API to navigate this state and its effect on retries.
// * Unfortunately, Sarama's circuit breaker and its errors are
// hard-coded and undocumented. We'd like to address this in the
// future.
// - If a batch fails with a circuit breaker error, delay before
// retrying it.
// * This would fix the most urgent performance issues, but requires
// extra bookkeeping because the Kafka output handles each batch
// independently. It results in potentially many batches / 10s of
// thousands of events being loaded and attempted, even though we
// know there's a fatal error early in the first batch. It also
// makes it hard to know when each batch should be retried.
// - In the Kafka Publish method, add a blocking first-pass intake step
// that can gate on error conditions, rather than handing off data
// to Sarama immediately.
// * This would fix the issue but would require a lot of work and
// testing, and we need a fix for the release now. It's also a
// fairly elaborate workaround for something that might be
// easier to fix in the library itself.
//
// Instead, we have applied the following fix, which is not very "nice"
// but satisfies all other important constraints:
// - When we receive a circuit breaker error, sleep for 10 seconds
// (Sarama's hard-coded timeout) on the _error worker thread_.
//
// This works because connection-level errors that can trigger the
// circuit breaker are on the critical path for input processing, and
// thus blocking on the error channel applies back-pressure to the
// input channel. This means that if there are any more errors while the
// error worker is asleep, any call to Publish will block until we
// start reading again.
//
// Reasons this solution is preferred:
// - It responds immediately to Sarama's global error state, rather than
// trying to detect it independently in each batch or adding more
// cumbersome synchronization to the output
// - It gives the minimal delay that is consistent with Sarama's
// internal behavior
// - It requires only a few lines of code and no design changes
//
// That said, this is still relying on undocumented library internals
// for correct behavior, which isn't ideal, but the error itself is an
// undocumented library internal, so this is de facto necessary for now.
// We'd like to have a more official / permanent fix merged into Sarama
// itself in the future.
// The "breakerOpen" flag keeps us from sleeping the first time we see
// a circuit breaker error, because it might be an old error still
// sitting in the channel from 10 seconds ago. So we only end up
// sleeping every _other_ reported breaker error.
if breakerOpen {
// Immediately log the error that presumably caused this state,
// since the error reporting on this batch will be delayed.
if msg.ref.err != nil {
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err)
}
select {
case <-time.After(10 * time.Second):
// Sarama's circuit breaker is hard-coded to reject all inputs
// for 10sec.
case <-msg.ref.client.done:
// Allow early bailout if the output itself is closing.
}
breakerOpen = false
} else {
breakerOpen = true
}
}
}
}
func (r *msgRef) done() {
r.dec()
}
func (r *msgRef) fail(msg *message, err error) {
switch {
case errors.Is(err, sarama.ErrInvalidMessage):
r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic)
r.client.observer.PermanentErrors(1)
case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize):
r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.",
msg.topic,
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)
case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)
case errors.Is(err, breaker.ErrBreakerOpen):
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
if i > 0 {
return
}
r.client.log.Debug("finished kafka batch")
stats := r.client.observer
err := r.err
if err != nil {
failed := len(r.failed)
success := r.total - failed
r.batch.RetryEvents(r.failed)
stats.RetryableErrors(failed)
if success > 0 {
stats.AckedEvents(success)
}
r.client.log.Debugf("Kafka publish failed with: %+v", err)
} else {
r.batch.ACK()
stats.AckedEvents(r.total)
}
}
func (c *client) Test(d testing.Driver) {
if c.config.Net.TLS.Enable {
d.Warn("TLS", "Kafka output doesn't support TLS testing")
}
for _, host := range c.hosts {
d.Run("Kafka: "+host, func(d testing.Driver) {
netDialer := transport.TestNetDialer(d, c.config.Net.DialTimeout)
_, err := netDialer.Dial("tcp", host)
d.Error("dial up", err)
})
}
}
func isAuthError(err error) bool {
for _, e := range authErrors {
if errors.Is(err, e) {
return true
}
}
return false
}