common/asyncworkflow/queue/kafka/config.go (56 lines of code) (raw):
// The MIT License (MIT)
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package kafka
import (
"fmt"
"strings"
"github.com/Shopify/sarama"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/config"
)
type (
queueConfig struct {
Connection connectionConfig `yaml:"connection"`
Topic string `yaml:"topic"`
}
connectionConfig struct {
Brokers []string `yaml:"brokers"`
TLS config.TLS `yaml:"tls"`
SASL config.SASL `yaml:"sasl"`
}
)
func (c *queueConfig) ID() string {
return fmt.Sprintf("kafka::%s/%s", c.Topic, strings.Join(c.Connection.Brokers, ","))
}
func newSaramaConfigWithAuth(tls *config.TLS, sasl *config.SASL) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
// TLS support
tlsConfig, err := tls.ToTLSConfig()
if err != nil {
return nil, fmt.Errorf("Error creating Kafka TLS config %w", err)
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
}
// SASL support
if sasl.Enabled {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.Handshake = true
saramaConfig.Net.SASL.User = sasl.User
saramaConfig.Net.SASL.Password = sasl.Password
switch sasl.Algorithm {
case "sha512":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA512}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "sha256":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA256}
}
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "plain":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return nil, fmt.Errorf("unknown SASL algorithm %v", sasl.Algorithm)
}
}
return saramaConfig, nil
}