func()

in kafka/common.go [312:358]


func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
	opts := []kgo.Opt{
		kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))),
		kgo.SeedBrokers(cfg.Brokers...),
	}
	if cfg.ClientID != "" {
		opts = append(opts, kgo.ClientID(cfg.ClientID))
		if cfg.Version != "" {
			opts = append(opts, kgo.SoftwareNameAndVersion(
				cfg.ClientID, cfg.Version,
			))
		}
	}
	if cfg.Dialer != nil {
		opts = append(opts, kgo.Dialer(cfg.Dialer))
	} else if cfg.TLS != nil {
		opts = append(opts, kgo.DialTLSConfig(cfg.TLS.Clone()))
	}
	if cfg.SASL != nil {
		opts = append(opts, kgo.SASL(cfg.SASL))
	}
	opts = append(opts, additionalOpts...)
	if !cfg.DisableTelemetry {
		metricHooks, err := newKgoHooks(cfg.meterProvider(),
			cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc,
		)
		if err != nil {
			return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err)
		}
		opts = append(opts,
			kgo.WithHooks(metricHooks, &loggerHook{logger: cfg.Logger}),
		)
	}
	if cfg.MetadataMaxAge > 0 {
		opts = append(opts, kgo.MetadataMaxAge(cfg.MetadataMaxAge))
	}
	if len(cfg.hooks) != 0 {
		opts = append(opts, kgo.WithHooks(cfg.hooks...))
	}
	client, err := kgo.NewClient(opts...)
	if err != nil {
		return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err)
	}
	// Issue a metadata refresh request on construction, so the broker list is populated.
	client.ForceMetadataRefresh()
	return client, nil
}