in kafka/common.go [176:289]
func (cfg *CommonConfig) finalize() error {
var errs []error
if cfg.Logger == nil {
cfg.Logger = zap.NewNop() // cfg.Logger may be used below
errs = append(errs, errors.New("kafka: logger must be set"))
} else {
cfg.Logger = cfg.Logger.Named("kafka")
}
if cfg.Namespace != "" {
cfg.Logger = cfg.Logger.With(zap.String("namespace", cfg.Namespace))
}
if cfg.ConfigFile == "" {
cfg.ConfigFile = os.Getenv("KAFKA_CONFIG_FILE")
}
certPath := os.Getenv("KAFKA_TLS_CERT_PATH")
keyPath := os.Getenv("KAFKA_TLS_KEY_PATH")
if cfg.ConfigFile != "" {
configFileHook, brokers, saslMechanism, err := newConfigFileHook(cfg.ConfigFile, cfg.Logger)
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error loading config file: %w", err))
} else {
cfg.hooks = append(cfg.hooks, configFileHook)
if len(brokers) != 0 {
cfg.Brokers = brokers
}
if saslMechanism != nil && certPath == "" && keyPath == "" {
// Only set SASL when there is no intention to configure mTLS.
cfg.SASL = saslMechanism
}
}
}
if len(cfg.Brokers) == 0 {
if v := os.Getenv("KAFKA_BROKERS"); v != "" {
cfg.Brokers = strings.Split(v, ",")
} else {
errs = append(errs, errors.New("kafka: at least one broker must be set"))
}
}
switch {
case cfg.TLS != nil && cfg.Dialer != nil:
errs = append(errs, errors.New("kafka: only one of TLS or Dialer can be set"))
case cfg.TLS == nil && cfg.Dialer == nil && os.Getenv("KAFKA_PLAINTEXT") != "true":
// Auto-configure TLS from environment variables.
cfg.TLS = &tls.Config{}
tlsInsecure := os.Getenv("KAFKA_TLS_INSECURE") == "true"
caCertPath := os.Getenv("KAFKA_TLS_CA_CERT_PATH")
if overriddenServerName, exists := os.LookupEnv("KAFKA_TLS_SERVER_NAME"); exists {
cfg.Logger.Debug("overriding TLS server name", zap.String("server_name", overriddenServerName))
cfg.TLS.ServerName = overriddenServerName
}
if tlsInsecure && (caCertPath != "" || certPath != "" || keyPath != "") {
errs = append(errs, errors.New(
"kafka: cannot set KAFKA_TLS_INSECURE when either of KAFKA_TLS_CA_CERT_PATH, KAFKA_TLS_CERT_PATH, or KAFKA_TLS_KEY_PATH are set",
))
break
}
if tlsInsecure {
cfg.TLS.InsecureSkipVerify = true
}
if caCertPath != "" || certPath != "" || keyPath != "" {
// Set a dialer that reloads the CA cert when the file changes.
dialFn, err := newCertReloadingDialer(
caCertPath, certPath, keyPath, 30*time.Second, cfg.TLS,
)
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error creating dialer with CA cert: %w", err))
break
}
cfg.Dialer = dialFn
cfg.TLS = nil
}
}
// Only configure SASL if it is not already set and when there is no
// intention to configure mTLS.
if cfg.SASL == nil && certPath == "" && keyPath == "" {
saslConfig := saslConfigProperties{
Mechanism: os.Getenv("KAFKA_SASL_MECHANISM"),
Username: os.Getenv("KAFKA_USERNAME"),
Password: os.Getenv("KAFKA_PASSWORD"),
}
if err := saslConfig.finalize(); err != nil {
errs = append(errs, fmt.Errorf("kafka: error configuring SASL: %w", err))
} else {
switch saslConfig.Mechanism {
case "PLAIN":
plainAuth := plain.Auth{
User: saslConfig.Username,
Pass: saslConfig.Password,
}
if plainAuth != (plain.Auth{}) {
cfg.SASL = plainAuth.AsMechanism()
}
case "AWS_MSK_IAM":
var err error
cfg.SASL, err = newAWSMSKIAMSASL()
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err))
}
}
}
}
// Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with
// an unknown type (like `zap.Field{}`).
if cfg.TopicLogFieldFunc != nil {
cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
}
if cfg.TopicAttributeFunc == nil {
cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
return attribute.KeyValue{}
}
}
return errors.Join(errs...)
}