in kafka/common.go [312:358]
func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
opts := []kgo.Opt{
kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))),
kgo.SeedBrokers(cfg.Brokers...),
}
if cfg.ClientID != "" {
opts = append(opts, kgo.ClientID(cfg.ClientID))
if cfg.Version != "" {
opts = append(opts, kgo.SoftwareNameAndVersion(
cfg.ClientID, cfg.Version,
))
}
}
if cfg.Dialer != nil {
opts = append(opts, kgo.Dialer(cfg.Dialer))
} else if cfg.TLS != nil {
opts = append(opts, kgo.DialTLSConfig(cfg.TLS.Clone()))
}
if cfg.SASL != nil {
opts = append(opts, kgo.SASL(cfg.SASL))
}
opts = append(opts, additionalOpts...)
if !cfg.DisableTelemetry {
metricHooks, err := newKgoHooks(cfg.meterProvider(),
cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc,
)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err)
}
opts = append(opts,
kgo.WithHooks(metricHooks, &loggerHook{logger: cfg.Logger}),
)
}
if cfg.MetadataMaxAge > 0 {
opts = append(opts, kgo.MetadataMaxAge(cfg.MetadataMaxAge))
}
if len(cfg.hooks) != 0 {
opts = append(opts, kgo.WithHooks(cfg.hooks...))
}
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err)
}
// Issue a metadata refresh request on construction, so the broker list is populated.
client.ForceMetadataRefresh()
return client, nil
}