in internal/output/kafka/kafka.go [74:89]
func (o *Output) createTopic() error {
admin, err := sarama.NewClusterAdmin([]string{o.opts.Addr}, o.config)
if err != nil {
return fmt.Errorf("failed to create cluster admin client: %w", err)
}
err = admin.CreateTopic(o.opts.KafkaOptions.Topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
if err != nil {
return fmt.Errorf("failed to create topic: %w", err)
}
return nil
}