func()

in kafka/consumer.go [131:183]


func (cfg *ConsumerConfig) finalize() error {
	var errs []error
	if err := cfg.CommonConfig.finalize(); err != nil {
		errs = append(errs, err)
	}
	if len(cfg.Topics) == 0 {
		errs = append(errs, errors.New("kafka: at least one topic must be set"))
	}
	if cfg.GroupID == "" {
		errs = append(errs, errors.New("kafka: consumer GroupID must be set"))
	}
	if cfg.Processor == nil {
		errs = append(errs, errors.New("kafka: processor must be set"))
	}
	if cfg.MaxPollBytes < 0 {
		errs = append(errs, errors.New("kafka: max poll bytes cannot be negative"))
	}
	if cfg.MaxPollPartitionBytes < 0 {
		errs = append(errs, errors.New("kafka: max poll partition bytes cannot be negative"))
	}
	if cfg.FetchMinBytes < 0 {
		errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative"))
	}
	if cfg.BrokerMaxReadBytes < 0 {
		errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
	}
	if cfg.MaxPollPartitionBytes > 1<<30 {
		cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB")
		cfg.MaxPollPartitionBytes = 1 << 30
	}
	if cfg.BrokerMaxReadBytes > 1<<30 {
		cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
		cfg.BrokerMaxReadBytes = 1 << 30
	}
	if cfg.MaxPollBytes > 0 {
		// math.MaxInt32 is 1<<31-1.
		if cfg.MaxPollBytes > 1<<30 {
			cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB")
			cfg.MaxPollBytes = 1 << 30
		}
		if cfg.BrokerMaxReadBytes == 0 {
			cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest")
			cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30))
		}
		if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes {
			errs = append(errs, fmt.Errorf(
				"kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)",
				cfg.BrokerMaxReadBytes, cfg.MaxPollBytes,
			))
		}
	}
	return errors.Join(errs...)
}