in internal/output/kafka/kafka.go [61:72]
func (o *Output) Write(b []byte) (int, error) {
msg := &sarama.ProducerMessage{
Topic: o.opts.KafkaOptions.Topic,
Value: sarama.ByteEncoder(b),
}
_, _, err := o.client.SendMessage(msg)
if err != nil {
return 0, fmt.Errorf("failed to create data in kafka topic: %w", err)
}
return len(b), nil
}