in plugins/client/kafka/client_config.go [35:76]
func (c *Client) loadConfig() (*sarama.Config, error) {
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.Idempotent = c.IdempotentWrites
cfg.Producer.RequiredAcks = sarama.RequiredAcks(c.RequiredAcks)
cfg.Producer.Compression = sarama.CompressionCodec(intToInt8(c.CompressionCodec))
if c.ProducerMaxRetry > 0 {
cfg.Producer.Retry.Max = c.ProducerMaxRetry
}
if c.MetaMaxRetry > 0 {
cfg.Metadata.Retry.Max = c.MetaMaxRetry
}
if c.RetryBackoff > 0 {
cfg.Producer.Retry.Backoff = time.Millisecond * time.Duration(c.RetryBackoff)
}
if c.RefreshPeriod > 0 {
cfg.Metadata.RefreshFrequency = time.Duration(c.RefreshPeriod) * time.Minute
}
if c.MaxMessageBytes > 0 {
cfg.Producer.MaxMessageBytes = c.MaxMessageBytes
}
if c.ClientID != "" {
cfg.ClientID = c.ClientID
}
if c.Version != "" {
if version, err := sarama.ParseKafkaVersion(c.Version); err != nil {
log.Logger.Errorf("error in parsing the kafka version, the kafka version would be set as default value: %v", err)
} else {
cfg.Version = version
}
}
cfg.Net.TLS.Enable = c.EnableTLS
if c.EnableTLS {
configTLS, err := c.configTLS()
if err != nil {
return nil, err
}
cfg.Net.TLS.Config = configTLS
}
return cfg, nil
}