common/asyncworkflow/queue/kafka/queue.go (73 lines of code) (raw):
// The MIT License (MIT)
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package kafka
import (
"fmt"
"sort"
"time"
"github.com/Shopify/sarama"
"github.com/uber/cadence/common/asyncworkflow/queue/consumer"
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
)
type (
queueImpl struct {
config *queueConfig
}
)
func newQueue(decoder provider.Decoder) (provider.Queue, error) {
var out queueConfig
if err := decoder.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
sort.Strings(out.Connection.Brokers)
return &queueImpl{
config: &out,
}, nil
}
func (q *queueImpl) ID() string {
return q.config.ID()
}
func (q *queueImpl) CreateConsumer(p *provider.Params) (provider.Consumer, error) {
consumerGroup := fmt.Sprintf("%s-asyncwf-consumer", q.config.Topic)
dlqTopic := fmt.Sprintf("%s-dlq", q.config.Topic)
dlqConfig, err := newSaramaConfigWithAuth(&q.config.Connection.TLS, &q.config.Connection.SASL)
if err != nil {
return nil, fmt.Errorf("failed to create kafka sarama config: %w", err)
}
dlqConfig.Producer.Return.Successes = true
dlqProducer, err := newProducer(dlqTopic, q.config.Connection.Brokers, dlqConfig, p.MetricsClient, p.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create kafka producer for dlq: %w", err)
}
consumerConfig, err := newSaramaConfigWithAuth(&q.config.Connection.TLS, &q.config.Connection.SASL)
if err != nil {
return nil, fmt.Errorf("failed to create kafka sarama config: %w", err)
}
consumerConfig.Consumer.Fetch.Default = 30 * 1024 * 1024 // 30MB.
consumerConfig.Consumer.Return.Errors = true
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // Use manual commit
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerConfig.Consumer.MaxProcessingTime = 250 * time.Millisecond
kafkaConsumer, err := kafka.NewKafkaConsumer(dlqProducer, q.config.Connection.Brokers, q.config.Topic, consumerGroup, consumerConfig, p.MetricsClient, p.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create kafka consumer: %w", err)
}
return consumer.New(q.ID(), kafkaConsumer, p.Logger, p.MetricsClient, p.FrontendClient), nil
}
func (q *queueImpl) CreateProducer(p *provider.Params) (messaging.Producer, error) {
config, err := newSaramaConfigWithAuth(&q.config.Connection.TLS, &q.config.Connection.SASL)
if err != nil {
return nil, err
}
config.Producer.Return.Successes = true
return newProducer(q.config.Topic, q.config.Connection.Brokers, config, p.MetricsClient, p.Logger)
}
func newProducer(topic string, brokers []string, saramaConfig *sarama.Config, metricsClient metrics.Client, logger log.Logger) (messaging.Producer, error) {
p, err := sarama.NewSyncProducer(brokers, saramaConfig)
if err != nil {
return nil, err
}
return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient), nil
}