func NewProducer()

in kafka/producer.go [165:199]


func NewProducer(cfg ProducerConfig) (*Producer, error) {
	if err := cfg.finalize(); err != nil {
		return nil, fmt.Errorf("kafka: invalid producer config: %w", err)
	}
	var opts []kgo.Opt
	if len(cfg.CompressionCodec) > 0 {
		opts = append(opts, kgo.ProducerBatchCompression(cfg.CompressionCodec...))
	}
	if cfg.MaxBufferedRecords != 0 {
		opts = append(opts, kgo.MaxBufferedRecords(cfg.MaxBufferedRecords))
	}
	if cfg.ProducerBatchMaxBytes != 0 {
		opts = append(opts, kgo.ProducerBatchMaxBytes(cfg.ProducerBatchMaxBytes))
	}
	if cfg.ManualFlushing {
		opts = append(opts, kgo.ManualFlushing())
	}
	if cfg.BatchListener != nil {
		opts = append(opts, kgo.WithHooks(cfg.BatchListener))
	}
	if cfg.RecordPartitioner != nil {
		opts = append(opts, kgo.RecordPartitioner(cfg.RecordPartitioner))
	}
	if cfg.AllowAutoTopicCreation {
		opts = append(opts, kgo.AllowAutoTopicCreation())
	}
	client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
	if err != nil {
		return nil, fmt.Errorf("kafka: failed creating producer: %w", err)
	}
	return &Producer{
		cfg:    cfg,
		client: client,
	}, nil
}