kafka/common.go (334 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/plugin/kzap"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// SASLMechanism type alias to sasl.Mechanism
type SASLMechanism = sasl.Mechanism
// TopicLogFieldFunc is a function that returns a zap.Field for a given topic.
type TopicLogFieldFunc func(topic string) zap.Field
// CommonConfig defines common configuration for Kafka consumers, producers,
// and managers.
type CommonConfig struct {
// ConfigFile holds the path to an optional YAML configuration file,
// which configures Brokers and SASL.
//
// If ConfigFile is unspecified, but $KAFKA_CONFIG_FILE is specified,
// it will be used to populate ConfigFile. Either way if a file is
// specified, it must exist when a client is initially created.
//
// The following properties from
// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
// are honoured:
//
// - bootstrap.servers ($KAFKA_BROKERS)
// - sasl.mechanism ($KAFKA_SASL_MECHANISM)
// - sasl.username ($KAFKA_USERNAME)
// - sasl.password ($KAFKA_PASSWORD)
//
// If bootstrap.servers is defined, then it takes precedence over
// CommonCnfig.Brokers. When a connection to a broker fails, the
// config file will be reloaded, and the seed brokers will be
// updated if bootstrap.servers has changed.
//
// If sasl.mechanism is set to PLAIN, or if sasl.username is defined,
// then SASL/PLAIN will be configured. Whenever a new connection is
// created, the config will be reloaded in case the username or
// password has been updated. If sasl.mechanism is set to AWS_MSK_IAM,
// then SASL/AWS_MSK_IAM is configured using the AWS SDK. Dynamic
// changes to the sasl.mechanism value are not supported.
ConfigFile string
// Namespace holds a namespace for Kafka topics.
//
// This is added as a prefix for topics names, and acts as a filter
// on topics monitored or described by the manager.
//
// Namespace is always removed from topic names before they are
// returned to callers. The only way Namespace will surface is in
// telemetry (e.g. metrics), as an independent dimension. This
// enables users to filter metrics by namespace, while maintaining
// stable topic names.
Namespace string
// Brokers is the list of kafka brokers used to seed the Kafka client.
//
// If Brokers is unspecified, but $KAFKA_BROKERS is specified, it will
// be parsed as a comma-separated list of broker addresses and used.
Brokers []string
// ClientID to use when connecting to Kafka. This is used for logging
// and client identification purposes.
ClientID string
// Version is the software version to use in the Kafka client. This is
// useful since it shows up in Kafka metrics and logs.
Version string
// SASL configures the kgo.Client to use SASL authorization.
//
// If SASL is unspecified, then it may be derived from environment
// variables as follows:
//
// - if $KAFKA_SASL_MECHANISM is set to PLAIN, or if $KAFKA_USERNAME
// and $KAFKA_PASSWORD are both specified, then SASL/PLAIN will be
// configured
// - if $KAFKA_SASL_MECHANISM is set to AWS_MSK_IAM, then
// SASL/AWS_MSK_IAM will be configured using the AWS SDK
SASL SASLMechanism
// TLS configures the kgo.Client to use TLS for authentication.
// This option conflicts with Dialer. Only one can be used.
//
// If neither TLS nor Dialer are specified, then TLS will be configured
// by default unless the environment variable $KAFKA_PLAINTEXT is set to
// "true". In case TLS is auto-configured, $KAFKA_TLS_INSECURE may be
// set to "true" to disable server certificate and hostname verification.
TLS *tls.Config
// Dialer uses fn to dial addresses, overriding the default dialer that uses a
// 10s dial timeout and no TLS (unless TLS option is set).
//
// The context passed to the dial function is the context used in the request
// that caused the dial. If the request is a client-internal request, the
// context is the context on the client itself (which is canceled when the
// client is closed).
// This option conflicts with TLS. Only one can be used.
Dialer func(ctx context.Context, network, address string) (net.Conn, error)
// Logger to use for any errors.
Logger *zap.Logger
// DisableTelemetry disables the OpenTelemetry hook.
DisableTelemetry bool
// TracerProvider allows specifying a custom otel tracer provider.
// Defaults to the global one.
TracerProvider trace.TracerProvider
// MeterProvider allows specifying a custom otel meter provider.
// Defaults to the global one.
MeterProvider metric.MeterProvider
// TopicAttributeFunc can be used to create custom dimensions from a Kafka
// topic for these metrics:
// - producer.messages.count
// - consumer.messages.fetched
TopicAttributeFunc TopicAttributeFunc
// TopicAttributeFunc can be used to create custom dimensions from a Kafka
// topic for log messages
TopicLogFieldFunc TopicLogFieldFunc
// MetadataMaxAge is the maximum age of metadata before it is refreshed.
// The lower the value the more frequently new topics will be discovered.
// If zero, the default value of 5 minutes is used.
MetadataMaxAge time.Duration
hooks []kgo.Hook
}
// finalize ensures the configuration is valid, setting default values from
// environment variables as described in doc comments, returning an error if
// any configuration is invalid.
func (cfg *CommonConfig) finalize() error {
var errs []error
if cfg.Logger == nil {
cfg.Logger = zap.NewNop() // cfg.Logger may be used below
errs = append(errs, errors.New("kafka: logger must be set"))
} else {
cfg.Logger = cfg.Logger.Named("kafka")
}
if cfg.Namespace != "" {
cfg.Logger = cfg.Logger.With(zap.String("namespace", cfg.Namespace))
}
if cfg.ConfigFile == "" {
cfg.ConfigFile = os.Getenv("KAFKA_CONFIG_FILE")
}
certPath := os.Getenv("KAFKA_TLS_CERT_PATH")
keyPath := os.Getenv("KAFKA_TLS_KEY_PATH")
if cfg.ConfigFile != "" {
configFileHook, brokers, saslMechanism, err := newConfigFileHook(cfg.ConfigFile, cfg.Logger)
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error loading config file: %w", err))
} else {
cfg.hooks = append(cfg.hooks, configFileHook)
if len(brokers) != 0 {
cfg.Brokers = brokers
}
if saslMechanism != nil && certPath == "" && keyPath == "" {
// Only set SASL when there is no intention to configure mTLS.
cfg.SASL = saslMechanism
}
}
}
if len(cfg.Brokers) == 0 {
if v := os.Getenv("KAFKA_BROKERS"); v != "" {
cfg.Brokers = strings.Split(v, ",")
} else {
errs = append(errs, errors.New("kafka: at least one broker must be set"))
}
}
switch {
case cfg.TLS != nil && cfg.Dialer != nil:
errs = append(errs, errors.New("kafka: only one of TLS or Dialer can be set"))
case cfg.TLS == nil && cfg.Dialer == nil && os.Getenv("KAFKA_PLAINTEXT") != "true":
// Auto-configure TLS from environment variables.
cfg.TLS = &tls.Config{}
tlsInsecure := os.Getenv("KAFKA_TLS_INSECURE") == "true"
caCertPath := os.Getenv("KAFKA_TLS_CA_CERT_PATH")
if overriddenServerName, exists := os.LookupEnv("KAFKA_TLS_SERVER_NAME"); exists {
cfg.Logger.Debug("overriding TLS server name", zap.String("server_name", overriddenServerName))
cfg.TLS.ServerName = overriddenServerName
}
if tlsInsecure && (caCertPath != "" || certPath != "" || keyPath != "") {
errs = append(errs, errors.New(
"kafka: cannot set KAFKA_TLS_INSECURE when either of KAFKA_TLS_CA_CERT_PATH, KAFKA_TLS_CERT_PATH, or KAFKA_TLS_KEY_PATH are set",
))
break
}
if tlsInsecure {
cfg.TLS.InsecureSkipVerify = true
}
if caCertPath != "" || certPath != "" || keyPath != "" {
// Set a dialer that reloads the CA cert when the file changes.
dialFn, err := newCertReloadingDialer(
caCertPath, certPath, keyPath, 30*time.Second, cfg.TLS,
)
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error creating dialer with CA cert: %w", err))
break
}
cfg.Dialer = dialFn
cfg.TLS = nil
}
}
// Only configure SASL if it is not already set and when there is no
// intention to configure mTLS.
if cfg.SASL == nil && certPath == "" && keyPath == "" {
saslConfig := saslConfigProperties{
Mechanism: os.Getenv("KAFKA_SASL_MECHANISM"),
Username: os.Getenv("KAFKA_USERNAME"),
Password: os.Getenv("KAFKA_PASSWORD"),
}
if err := saslConfig.finalize(); err != nil {
errs = append(errs, fmt.Errorf("kafka: error configuring SASL: %w", err))
} else {
switch saslConfig.Mechanism {
case "PLAIN":
plainAuth := plain.Auth{
User: saslConfig.Username,
Pass: saslConfig.Password,
}
if plainAuth != (plain.Auth{}) {
cfg.SASL = plainAuth.AsMechanism()
}
case "AWS_MSK_IAM":
var err error
cfg.SASL, err = newAWSMSKIAMSASL()
if err != nil {
errs = append(errs, fmt.Errorf("kafka: error configuring SASL/AWS_MSK_IAM: %w", err))
}
}
}
}
// Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with
// an unknown type (like `zap.Field{}`).
if cfg.TopicLogFieldFunc != nil {
cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
}
if cfg.TopicAttributeFunc == nil {
cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
return attribute.KeyValue{}
}
}
return errors.Join(errs...)
}
func (cfg *CommonConfig) namespacePrefix() string {
if cfg.Namespace == "" {
return ""
}
return cfg.Namespace + "-"
}
func (cfg *CommonConfig) tracerProvider() trace.TracerProvider {
if cfg.TracerProvider != nil {
return cfg.TracerProvider
}
return otel.GetTracerProvider()
}
func (cfg *CommonConfig) meterProvider() metric.MeterProvider {
if cfg.MeterProvider != nil {
return cfg.MeterProvider
}
return otel.GetMeterProvider()
}
func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
opts := []kgo.Opt{
kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))),
kgo.SeedBrokers(cfg.Brokers...),
}
if cfg.ClientID != "" {
opts = append(opts, kgo.ClientID(cfg.ClientID))
if cfg.Version != "" {
opts = append(opts, kgo.SoftwareNameAndVersion(
cfg.ClientID, cfg.Version,
))
}
}
if cfg.Dialer != nil {
opts = append(opts, kgo.Dialer(cfg.Dialer))
} else if cfg.TLS != nil {
opts = append(opts, kgo.DialTLSConfig(cfg.TLS.Clone()))
}
if cfg.SASL != nil {
opts = append(opts, kgo.SASL(cfg.SASL))
}
opts = append(opts, additionalOpts...)
if !cfg.DisableTelemetry {
metricHooks, err := newKgoHooks(cfg.meterProvider(),
cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc,
)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err)
}
opts = append(opts,
kgo.WithHooks(metricHooks, &loggerHook{logger: cfg.Logger}),
)
}
if cfg.MetadataMaxAge > 0 {
opts = append(opts, kgo.MetadataMaxAge(cfg.MetadataMaxAge))
}
if len(cfg.hooks) != 0 {
opts = append(opts, kgo.WithHooks(cfg.hooks...))
}
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err)
}
// Issue a metadata refresh request on construction, so the broker list is populated.
client.ForceMetadataRefresh()
return client, nil
}
func newAWSMSKIAMSASL() (sasl.Mechanism, error) {
return aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
awscfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return aws.Auth{}, fmt.Errorf("kafka: error loading AWS config: %w", err)
}
creds, err := awscfg.Credentials.Retrieve(ctx)
if err != nil {
return aws.Auth{}, err
}
return aws.Auth{
AccessKey: creds.AccessKeyID,
SecretKey: creds.SecretAccessKey,
SessionToken: creds.SessionToken,
}, nil
}), nil
}
func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc {
return func(t string) zap.Field {
if f == nil {
return zap.Skip()
}
if field := f(t); field.Type > zapcore.UnknownType {
return field
}
return zap.Skip()
}
}
// newCertReloadingDialer returns a dialer that reloads the CA cert when the
// file mod time changes.
func newCertReloadingDialer(caPath, certPath, keyPath string,
poll time.Duration,
tlsCfg *tls.Config,
) (func(ctx context.Context, network, address string) (net.Conn, error), error) {
dialer := &net.Dialer{Timeout: 10 * time.Second} // default dialer timeout in kgo.
kp := &keyPair{
caPath: caPath,
certPath: certPath,
keyPath: keyPath,
config: tlsCfg.Clone(),
}
if err := kp.reloadIfChanged(); err != nil {
return nil, err
}
ticker := time.NewTicker(poll)
return func(ctx context.Context, network, host string) (net.Conn, error) {
select {
case <-ticker.C:
if err := kp.reloadIfChanged(); err != nil {
return nil, err
}
default:
}
c := kp.clone()
// Copied this pattern from franz-go client.go.
// https://github.com/twmb/franz-go/blob/f30c518d6b727b9169a90b8c10e2127301822a3a/pkg/kgo/client.go#L440-L453
if c.ServerName == "" {
server, _, err := net.SplitHostPort(host)
if err != nil {
return nil, fmt.Errorf("dialer: unable to split host:port for dialing: %w", err)
}
c.ServerName = server
}
return (&tls.Dialer{
NetDialer: dialer,
Config: c,
}).DialContext(ctx, network, host)
}, nil
}
// keyPair is a struct that holds a certificate and private key.
type keyPair struct {
caPath string
certPath string
keyPath string
config *tls.Config // Local copy.
certModTS time.Time
caModTS time.Time
mu sync.RWMutex
}
// loadIfModified loads the certificate and private key if the mod time has
// changed. Use locking
func (kp *keyPair) reloadIfChanged() error {
kp.mu.Lock()
defer kp.mu.Unlock()
if kp.certPath != "" && kp.keyPath != "" {
certInfo, err := os.Stat(kp.certPath)
if err != nil {
return fmt.Errorf("unable to stat certificate file: %w", err)
}
if kp.certModTS.Before(certInfo.ModTime()) {
if certInfo.Size() < 1 {
return nil
}
cert, err := tls.LoadX509KeyPair(kp.certPath, kp.keyPath)
if err != nil {
return fmt.Errorf("failed to load new certificate: %w", err)
}
kp.config.Certificates = []tls.Certificate{cert}
kp.certModTS = certInfo.ModTime()
}
}
caInfo, err := os.Stat(kp.caPath)
if err != nil {
return fmt.Errorf("unable to stat CA certificate file: %w", err)
}
if kp.caModTS.Before(caInfo.ModTime()) {
caCert, err := os.ReadFile(kp.caPath)
if err != nil {
return fmt.Errorf("failed to read CA certificate: %w", err)
}
if len(caCert) == 0 {
return nil
}
kp.config.RootCAs = x509.NewCertPool()
if !kp.config.RootCAs.AppendCertsFromPEM(caCert) {
return fmt.Errorf("failed to append CA cert: %w", err)
}
kp.caModTS = caInfo.ModTime()
}
return nil
}
func (kp *keyPair) clone() *tls.Config {
kp.mu.RLock()
defer kp.mu.RUnlock()
return kp.config.Clone()
}