func newConfigFileHook()

in kafka/configfile.go [47:92]


func newConfigFileHook(filepath string, logger *zap.Logger) (_ *configFileHook, brokers []string, _ sasl.Mechanism, _ error) {
	config, err := loadConfigFile(filepath)
	if err != nil {
		return nil, nil, nil, err
	}
	if config.Bootstrap.Servers != "" {
		brokers = strings.Split(config.Bootstrap.Servers, ",")
	}
	var saslMechanism sasl.Mechanism
	switch config.SASL.Mechanism {
	case "PLAIN":
		var lastPlainAuthMu sync.Mutex
		var lastPlainAuth plain.Auth
		saslMechanism = plain.Plain(func(context.Context) (plain.Auth, error) {
			config, err := loadConfigFile(filepath)
			if err != nil {
				return plain.Auth{}, fmt.Errorf("failed to reload kafka config: %w", err)
			}
			lastPlainAuthMu.Lock()
			defer lastPlainAuthMu.Unlock()
			plainAuth := plain.Auth{
				User: config.SASL.Username,
				Pass: config.SASL.Password,
			}
			if plainAuth != lastPlainAuth {
				lastPlainAuth = plainAuth
				logger.Info(
					"updated SASL/PLAIN credentials from kafka config file",
					zap.String("username", plainAuth.User),
				)
			}
			return plainAuth, nil
		})
	case "AWS_MSK_IAM":
		saslMechanism, err = newAWSMSKIAMSASL()
		if err != nil {
			return nil, nil, nil, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err)
		}
	}
	h := &configFileHook{
		filepath:             filepath,
		logger:               logger,
		lastBootstrapServers: config.Bootstrap.Servers,
	}
	return h, brokers, saslMechanism, nil
}