in kafka/configfile.go [98:130]
func (h *configFileHook) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
if err == nil {
return
}
// Failed to connect, reload config in case the bootstrap servers have changed.
h.logger.Debug("kafka broker connection failed, reloading kafka config")
newConfig, err := loadConfigFile(h.filepath)
if err != nil {
h.logger.Warn("failed to reload kafka config", zap.Error(err))
return
}
h.mu.Lock()
defer h.mu.Unlock()
if newConfig.Bootstrap.Servers == h.lastBootstrapServers {
return
}
bootstrapServers := strings.Split(newConfig.Bootstrap.Servers, ",")
if err := h.client.UpdateSeedBrokers(bootstrapServers...); err != nil {
h.logger.Warn("error updating kafka seed brokers",
zap.Strings("addresses", bootstrapServers),
zap.Error(err),
)
return
}
h.logger.Info("updated kafka seed brokers",
zap.Strings("addresses", bootstrapServers),
)
h.lastBootstrapServers = newConfig.Bootstrap.Servers
}