in kafka/configfile.go [47:92]
func newConfigFileHook(filepath string, logger *zap.Logger) (_ *configFileHook, brokers []string, _ sasl.Mechanism, _ error) {
config, err := loadConfigFile(filepath)
if err != nil {
return nil, nil, nil, err
}
if config.Bootstrap.Servers != "" {
brokers = strings.Split(config.Bootstrap.Servers, ",")
}
var saslMechanism sasl.Mechanism
switch config.SASL.Mechanism {
case "PLAIN":
var lastPlainAuthMu sync.Mutex
var lastPlainAuth plain.Auth
saslMechanism = plain.Plain(func(context.Context) (plain.Auth, error) {
config, err := loadConfigFile(filepath)
if err != nil {
return plain.Auth{}, fmt.Errorf("failed to reload kafka config: %w", err)
}
lastPlainAuthMu.Lock()
defer lastPlainAuthMu.Unlock()
plainAuth := plain.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}
if plainAuth != lastPlainAuth {
lastPlainAuth = plainAuth
logger.Info(
"updated SASL/PLAIN credentials from kafka config file",
zap.String("username", plainAuth.User),
)
}
return plainAuth, nil
})
case "AWS_MSK_IAM":
saslMechanism, err = newAWSMSKIAMSASL()
if err != nil {
return nil, nil, nil, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err)
}
}
h := &configFileHook{
filepath: filepath,
logger: logger,
lastBootstrapServers: config.Bootstrap.Servers,
}
return h, brokers, saslMechanism, nil
}