in kafka/producer.go [165:199]
func NewProducer(cfg ProducerConfig) (*Producer, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid producer config: %w", err)
}
var opts []kgo.Opt
if len(cfg.CompressionCodec) > 0 {
opts = append(opts, kgo.ProducerBatchCompression(cfg.CompressionCodec...))
}
if cfg.MaxBufferedRecords != 0 {
opts = append(opts, kgo.MaxBufferedRecords(cfg.MaxBufferedRecords))
}
if cfg.ProducerBatchMaxBytes != 0 {
opts = append(opts, kgo.ProducerBatchMaxBytes(cfg.ProducerBatchMaxBytes))
}
if cfg.ManualFlushing {
opts = append(opts, kgo.ManualFlushing())
}
if cfg.BatchListener != nil {
opts = append(opts, kgo.WithHooks(cfg.BatchListener))
}
if cfg.RecordPartitioner != nil {
opts = append(opts, kgo.RecordPartitioner(cfg.RecordPartitioner))
}
if cfg.AllowAutoTopicCreation {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating producer: %w", err)
}
return &Producer{
cfg: cfg,
client: client,
}, nil
}