func()

in kafka/producer.go [117:152]


func (cfg *ProducerConfig) finalize() error {
	var errs []error
	if err := cfg.CommonConfig.finalize(); err != nil {
		errs = append(errs, err)
	}
	if cfg.MaxBufferedRecords < 0 {
		errs = append(errs, fmt.Errorf("kafka: max buffered records cannot be negative: %d", cfg.MaxBufferedRecords))
	}
	if cfg.ProducerBatchMaxBytes < 0 {
		errs = append(errs, fmt.Errorf("kafka: producer batch max bytes cannot be negative: %d", cfg.ProducerBatchMaxBytes))
	}
	if len(cfg.CompressionCodec) == 0 {
		if v := os.Getenv("KAFKA_PRODUCER_COMPRESSION_CODEC"); v != "" {
			names := strings.Split(v, ",")
			codecs := make([]CompressionCodec, 0, len(names))
			for _, name := range names {
				switch name {
				case "none":
					codecs = append(codecs, NoCompression())
				case "gzip":
					codecs = append(codecs, GzipCompression())
				case "snappy":
					codecs = append(codecs, SnappyCompression())
				case "lz4":
					codecs = append(codecs, Lz4Compression())
				case "zstd":
					codecs = append(codecs, ZstdCompression())
				default:
					errs = append(errs, fmt.Errorf("kafka: unknown codec %q", name))
				}
			}
			cfg.CompressionCodec = codecs
		}
	}
	return errors.Join(errs...)
}