func New()

in internal/output/kafka/kafka.go [26:46]


func New(opts *output.Options) (output.Output, error) {
	if opts.Addr == "" {
		return nil, errors.New("kafka address is required")
	}

	config := sarama.NewConfig()
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true
	saramaClient, err := sarama.NewClient([]string{opts.Addr}, config)
	if err != nil {
		return nil, fmt.Errorf("failed to create sarama client: %w", err)
	}

	producer, err := sarama.NewSyncProducerFromClient(saramaClient)
	if err != nil {
		return nil, fmt.Errorf("failed to create producer client: %w", err)
	}

	return &Output{opts: opts, client: producer, config: config}, nil
}