func()

in internal/output/kafka/kafka.go [74:89]


func (o *Output) createTopic() error {
	admin, err := sarama.NewClusterAdmin([]string{o.opts.Addr}, o.config)
	if err != nil {
		return fmt.Errorf("failed to create cluster admin client: %w", err)
	}

	err = admin.CreateTopic(o.opts.KafkaOptions.Topic, &sarama.TopicDetail{
		NumPartitions:     1,
		ReplicationFactor: 1,
	}, false)

	if err != nil {
		return fmt.Errorf("failed to create topic: %w", err)
	}
	return nil
}