func()

in plugins/client/kafka/client_config.go [35:76]


func (c *Client) loadConfig() (*sarama.Config, error) {
	cfg := sarama.NewConfig()
	cfg.Producer.Return.Successes = true
	cfg.Producer.Return.Errors = true
	cfg.Producer.Idempotent = c.IdempotentWrites
	cfg.Producer.RequiredAcks = sarama.RequiredAcks(c.RequiredAcks)
	cfg.Producer.Compression = sarama.CompressionCodec(intToInt8(c.CompressionCodec))
	if c.ProducerMaxRetry > 0 {
		cfg.Producer.Retry.Max = c.ProducerMaxRetry
	}
	if c.MetaMaxRetry > 0 {
		cfg.Metadata.Retry.Max = c.MetaMaxRetry
	}
	if c.RetryBackoff > 0 {
		cfg.Producer.Retry.Backoff = time.Millisecond * time.Duration(c.RetryBackoff)
	}
	if c.RefreshPeriod > 0 {
		cfg.Metadata.RefreshFrequency = time.Duration(c.RefreshPeriod) * time.Minute
	}
	if c.MaxMessageBytes > 0 {
		cfg.Producer.MaxMessageBytes = c.MaxMessageBytes
	}
	if c.ClientID != "" {
		cfg.ClientID = c.ClientID
	}
	if c.Version != "" {
		if version, err := sarama.ParseKafkaVersion(c.Version); err != nil {
			log.Logger.Errorf("error in parsing the kafka version, the kafka version would be set as default value: %v", err)
		} else {
			cfg.Version = version
		}
	}
	cfg.Net.TLS.Enable = c.EnableTLS
	if c.EnableTLS {
		configTLS, err := c.configTLS()
		if err != nil {
			return nil, err
		}
		cfg.Net.TLS.Config = configTLS
	}
	return cfg, nil
}