internal/output/kafka/kafka.go (69 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more agreements. // Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. package kafka import ( "context" "errors" "fmt" "github.com/IBM/sarama" "github.com/elastic/stream/internal/output" ) func init() { output.Register("kafka", New) } type Output struct { opts *output.Options client sarama.SyncProducer config *sarama.Config } func New(opts *output.Options) (output.Output, error) { if opts.Addr == "" { return nil, errors.New("kafka address is required") } config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true saramaClient, err := sarama.NewClient([]string{opts.Addr}, config) if err != nil { return nil, fmt.Errorf("failed to create sarama client: %w", err) } producer, err := sarama.NewSyncProducerFromClient(saramaClient) if err != nil { return nil, fmt.Errorf("failed to create producer client: %w", err) } return &Output{opts: opts, client: producer, config: config}, nil } func (o *Output) DialContext(_ context.Context) error { if err := o.createTopic(); err != nil { return err } return nil } func (o *Output) Close() error { o.client.Close() return nil } func (o *Output) Write(b []byte) (int, error) { msg := &sarama.ProducerMessage{ Topic: o.opts.KafkaOptions.Topic, Value: sarama.ByteEncoder(b), } _, _, err := o.client.SendMessage(msg) if err != nil { return 0, fmt.Errorf("failed to create data in kafka topic: %w", err) } return len(b), nil } func (o *Output) createTopic() error { admin, err := sarama.NewClusterAdmin([]string{o.opts.Addr}, o.config) if err != nil { return fmt.Errorf("failed to create cluster admin client: %w", err) } err = admin.CreateTopic(o.opts.KafkaOptions.Topic, &sarama.TopicDetail{ NumPartitions: 1, ReplicationFactor: 1, }, false) if err != nil { return fmt.Errorf("failed to create topic: %w", err) } return nil }