in kafka/common.go [392:430]
func newCertReloadingDialer(caPath, certPath, keyPath string,
poll time.Duration,
tlsCfg *tls.Config,
) (func(ctx context.Context, network, address string) (net.Conn, error), error) {
dialer := &net.Dialer{Timeout: 10 * time.Second} // default dialer timeout in kgo.
kp := &keyPair{
caPath: caPath,
certPath: certPath,
keyPath: keyPath,
config: tlsCfg.Clone(),
}
if err := kp.reloadIfChanged(); err != nil {
return nil, err
}
ticker := time.NewTicker(poll)
return func(ctx context.Context, network, host string) (net.Conn, error) {
select {
case <-ticker.C:
if err := kp.reloadIfChanged(); err != nil {
return nil, err
}
default:
}
c := kp.clone()
// Copied this pattern from franz-go client.go.
// https://github.com/twmb/franz-go/blob/f30c518d6b727b9169a90b8c10e2127301822a3a/pkg/kgo/client.go#L440-L453
if c.ServerName == "" {
server, _, err := net.SplitHostPort(host)
if err != nil {
return nil, fmt.Errorf("dialer: unable to split host:port for dialing: %w", err)
}
c.ServerName = server
}
return (&tls.Dialer{
NetDialer: dialer,
Config: c,
}).DialContext(ctx, network, host)
}, nil
}