kafka/configfile.go (136 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kafka import ( "context" "fmt" "net" "os" "strings" "sync" "time" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl" "github.com/twmb/franz-go/pkg/sasl/plain" "go.uber.org/zap" "gopkg.in/yaml.v3" ) type configFileHook struct { filepath string logger *zap.Logger client *kgo.Client mu sync.RWMutex lastBootstrapServers string } // newConfigFileHook returns a configFileHook, along with a list of seed brokers and // possibly a sasl.Mechanism if `sasl.mechanism` is defined in the file. func newConfigFileHook(filepath string, logger *zap.Logger) (_ *configFileHook, brokers []string, _ sasl.Mechanism, _ error) { config, err := loadConfigFile(filepath) if err != nil { return nil, nil, nil, err } if config.Bootstrap.Servers != "" { brokers = strings.Split(config.Bootstrap.Servers, ",") } var saslMechanism sasl.Mechanism switch config.SASL.Mechanism { case "PLAIN": var lastPlainAuthMu sync.Mutex var lastPlainAuth plain.Auth saslMechanism = plain.Plain(func(context.Context) (plain.Auth, error) { config, err := loadConfigFile(filepath) if err != nil { return plain.Auth{}, fmt.Errorf("failed to reload kafka config: %w", err) } lastPlainAuthMu.Lock() defer lastPlainAuthMu.Unlock() plainAuth := plain.Auth{ User: config.SASL.Username, Pass: config.SASL.Password, } if plainAuth != lastPlainAuth { lastPlainAuth = plainAuth logger.Info( "updated SASL/PLAIN credentials from kafka config file", zap.String("username", plainAuth.User), ) } return plainAuth, nil }) case "AWS_MSK_IAM": saslMechanism, err = newAWSMSKIAMSASL() if err != nil { return nil, nil, nil, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err) } } h := &configFileHook{ filepath: filepath, logger: logger, lastBootstrapServers: config.Bootstrap.Servers, } return h, brokers, saslMechanism, nil } func (h *configFileHook) OnNewClient(client *kgo.Client) { h.client = client } func (h *configFileHook) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { if err == nil { return } // Failed to connect, reload config in case the bootstrap servers have changed. h.logger.Debug("kafka broker connection failed, reloading kafka config") newConfig, err := loadConfigFile(h.filepath) if err != nil { h.logger.Warn("failed to reload kafka config", zap.Error(err)) return } h.mu.Lock() defer h.mu.Unlock() if newConfig.Bootstrap.Servers == h.lastBootstrapServers { return } bootstrapServers := strings.Split(newConfig.Bootstrap.Servers, ",") if err := h.client.UpdateSeedBrokers(bootstrapServers...); err != nil { h.logger.Warn("error updating kafka seed brokers", zap.Strings("addresses", bootstrapServers), zap.Error(err), ) return } h.logger.Info("updated kafka seed brokers", zap.Strings("addresses", bootstrapServers), ) h.lastBootstrapServers = newConfig.Bootstrap.Servers } type configProperties struct { Bootstrap struct { Servers string `yaml:"servers"` } `yaml:"bootstrap"` SASL saslConfigProperties `yaml:"sasl"` } type saslConfigProperties struct { Mechanism string `yaml:"mechanism"` Username string `yaml:"username"` Password string `yaml:"password"` } func (s *saslConfigProperties) finalize() error { switch s.Mechanism { case "": if s.Username != "" { s.Mechanism = "PLAIN" } case "PLAIN", "AWS_MSK_IAM": default: return fmt.Errorf("kafka: unsupported SASL mechanism %q", s.Mechanism) } return nil } func loadConfigFile(filepath string) (configProperties, error) { var config configProperties data, err := os.ReadFile(filepath) if err != nil { return config, fmt.Errorf("error reading kafka config file: %w", err) } if err := yaml.Unmarshal(data, &config); err != nil { return config, fmt.Errorf("error parsing kafka config file %q: %w", filepath, err) } if err := config.SASL.finalize(); err != nil { return configProperties{}, err } return config, nil }