func newCertReloadingDialer()

in kafka/common.go [392:430]


func newCertReloadingDialer(caPath, certPath, keyPath string,
	poll time.Duration,
	tlsCfg *tls.Config,
) (func(ctx context.Context, network, address string) (net.Conn, error), error) {
	dialer := &net.Dialer{Timeout: 10 * time.Second} // default dialer timeout in kgo.
	kp := &keyPair{
		caPath:   caPath,
		certPath: certPath,
		keyPath:  keyPath,
		config:   tlsCfg.Clone(),
	}
	if err := kp.reloadIfChanged(); err != nil {
		return nil, err
	}
	ticker := time.NewTicker(poll)
	return func(ctx context.Context, network, host string) (net.Conn, error) {
		select {
		case <-ticker.C:
			if err := kp.reloadIfChanged(); err != nil {
				return nil, err
			}
		default:
		}
		c := kp.clone()
		// Copied this pattern from franz-go client.go.
		// https://github.com/twmb/franz-go/blob/f30c518d6b727b9169a90b8c10e2127301822a3a/pkg/kgo/client.go#L440-L453
		if c.ServerName == "" {
			server, _, err := net.SplitHostPort(host)
			if err != nil {
				return nil, fmt.Errorf("dialer: unable to split host:port for dialing: %w", err)
			}
			c.ServerName = server
		}
		return (&tls.Dialer{
			NetDialer: dialer,
			Config:    c,
		}).DialContext(ctx, network, host)
	}, nil
}