in kafka/producer.go [117:152]
func (cfg *ProducerConfig) finalize() error {
var errs []error
if err := cfg.CommonConfig.finalize(); err != nil {
errs = append(errs, err)
}
if cfg.MaxBufferedRecords < 0 {
errs = append(errs, fmt.Errorf("kafka: max buffered records cannot be negative: %d", cfg.MaxBufferedRecords))
}
if cfg.ProducerBatchMaxBytes < 0 {
errs = append(errs, fmt.Errorf("kafka: producer batch max bytes cannot be negative: %d", cfg.ProducerBatchMaxBytes))
}
if len(cfg.CompressionCodec) == 0 {
if v := os.Getenv("KAFKA_PRODUCER_COMPRESSION_CODEC"); v != "" {
names := strings.Split(v, ",")
codecs := make([]CompressionCodec, 0, len(names))
for _, name := range names {
switch name {
case "none":
codecs = append(codecs, NoCompression())
case "gzip":
codecs = append(codecs, GzipCompression())
case "snappy":
codecs = append(codecs, SnappyCompression())
case "lz4":
codecs = append(codecs, Lz4Compression())
case "zstd":
codecs = append(codecs, ZstdCompression())
default:
errs = append(errs, fmt.Errorf("kafka: unknown codec %q", name))
}
}
cfg.CompressionCodec = codecs
}
}
return errors.Join(errs...)
}