func()

in kafka/common.go [176:289]


func (cfg *CommonConfig) finalize() error {
	var errs []error
	if cfg.Logger == nil {
		cfg.Logger = zap.NewNop() // cfg.Logger may be used below
		errs = append(errs, errors.New("kafka: logger must be set"))
	} else {
		cfg.Logger = cfg.Logger.Named("kafka")
	}
	if cfg.Namespace != "" {
		cfg.Logger = cfg.Logger.With(zap.String("namespace", cfg.Namespace))
	}
	if cfg.ConfigFile == "" {
		cfg.ConfigFile = os.Getenv("KAFKA_CONFIG_FILE")
	}
	certPath := os.Getenv("KAFKA_TLS_CERT_PATH")
	keyPath := os.Getenv("KAFKA_TLS_KEY_PATH")
	if cfg.ConfigFile != "" {
		configFileHook, brokers, saslMechanism, err := newConfigFileHook(cfg.ConfigFile, cfg.Logger)
		if err != nil {
			errs = append(errs, fmt.Errorf("kafka: error loading config file: %w", err))
		} else {
			cfg.hooks = append(cfg.hooks, configFileHook)
			if len(brokers) != 0 {
				cfg.Brokers = brokers
			}
			if saslMechanism != nil && certPath == "" && keyPath == "" {
				// Only set SASL when there is no intention to configure mTLS.
				cfg.SASL = saslMechanism
			}
		}
	}
	if len(cfg.Brokers) == 0 {
		if v := os.Getenv("KAFKA_BROKERS"); v != "" {
			cfg.Brokers = strings.Split(v, ",")
		} else {
			errs = append(errs, errors.New("kafka: at least one broker must be set"))
		}
	}
	switch {
	case cfg.TLS != nil && cfg.Dialer != nil:
		errs = append(errs, errors.New("kafka: only one of TLS or Dialer can be set"))
	case cfg.TLS == nil && cfg.Dialer == nil && os.Getenv("KAFKA_PLAINTEXT") != "true":
		// Auto-configure TLS from environment variables.
		cfg.TLS = &tls.Config{}
		tlsInsecure := os.Getenv("KAFKA_TLS_INSECURE") == "true"
		caCertPath := os.Getenv("KAFKA_TLS_CA_CERT_PATH")
		if overriddenServerName, exists := os.LookupEnv("KAFKA_TLS_SERVER_NAME"); exists {
			cfg.Logger.Debug("overriding TLS server name", zap.String("server_name", overriddenServerName))
			cfg.TLS.ServerName = overriddenServerName
		}
		if tlsInsecure && (caCertPath != "" || certPath != "" || keyPath != "") {
			errs = append(errs, errors.New(
				"kafka: cannot set KAFKA_TLS_INSECURE when either of KAFKA_TLS_CA_CERT_PATH, KAFKA_TLS_CERT_PATH, or KAFKA_TLS_KEY_PATH are set",
			))
			break
		}
		if tlsInsecure {
			cfg.TLS.InsecureSkipVerify = true
		}
		if caCertPath != "" || certPath != "" || keyPath != "" {
			// Set a dialer that reloads the CA cert when the file changes.
			dialFn, err := newCertReloadingDialer(
				caCertPath, certPath, keyPath, 30*time.Second, cfg.TLS,
			)
			if err != nil {
				errs = append(errs, fmt.Errorf("kafka: error creating dialer with CA cert: %w", err))
				break
			}
			cfg.Dialer = dialFn
			cfg.TLS = nil
		}
	}
	// Only configure SASL if it is not already set and when there is no
	// intention to configure mTLS.
	if cfg.SASL == nil && certPath == "" && keyPath == "" {
		saslConfig := saslConfigProperties{
			Mechanism: os.Getenv("KAFKA_SASL_MECHANISM"),
			Username:  os.Getenv("KAFKA_USERNAME"),
			Password:  os.Getenv("KAFKA_PASSWORD"),
		}
		if err := saslConfig.finalize(); err != nil {
			errs = append(errs, fmt.Errorf("kafka: error configuring SASL: %w", err))
		} else {
			switch saslConfig.Mechanism {
			case "PLAIN":
				plainAuth := plain.Auth{
					User: saslConfig.Username,
					Pass: saslConfig.Password,
				}
				if plainAuth != (plain.Auth{}) {
					cfg.SASL = plainAuth.AsMechanism()
				}
			case "AWS_MSK_IAM":
				var err error
				cfg.SASL, err = newAWSMSKIAMSASL()
				if err != nil {
					errs = append(errs, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err))
				}
			}
		}
	}
	// Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with
	// an unknown type (like `zap.Field{}`).
	if cfg.TopicLogFieldFunc != nil {
		cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
	}
	if cfg.TopicAttributeFunc == nil {
		cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
			return attribute.KeyValue{}
		}
	}

	return errors.Join(errs...)
}