in internal/output/kafka/kafka.go [26:46]
func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("kafka address is required")
}
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
saramaClient, err := sarama.NewClient([]string{opts.Addr}, config)
if err != nil {
return nil, fmt.Errorf("failed to create sarama client: %w", err)
}
producer, err := sarama.NewSyncProducerFromClient(saramaClient)
if err != nil {
return nil, fmt.Errorf("failed to create producer client: %w", err)
}
return &Output{opts: opts, client: producer, config: config}, nil
}