func()

in kafka/configfile.go [98:130]


func (h *configFileHook) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
	if err == nil {
		return
	}

	// Failed to connect, reload config in case the bootstrap servers have changed.
	h.logger.Debug("kafka broker connection failed, reloading kafka config")

	newConfig, err := loadConfigFile(h.filepath)
	if err != nil {
		h.logger.Warn("failed to reload kafka config", zap.Error(err))
		return
	}

	h.mu.Lock()
	defer h.mu.Unlock()

	if newConfig.Bootstrap.Servers == h.lastBootstrapServers {
		return
	}
	bootstrapServers := strings.Split(newConfig.Bootstrap.Servers, ",")
	if err := h.client.UpdateSeedBrokers(bootstrapServers...); err != nil {
		h.logger.Warn("error updating kafka seed brokers",
			zap.Strings("addresses", bootstrapServers),
			zap.Error(err),
		)
		return
	}
	h.logger.Info("updated kafka seed brokers",
		zap.Strings("addresses", bootstrapServers),
	)
	h.lastBootstrapServers = newConfig.Bootstrap.Servers
}