pulsar/consumer_impl.go (715 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
pkgerrors "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
AckID(id MessageID) error
AckIDList(msgIDs []MessageID) error
AckIDWithResponse(id MessageID) error
AckIDWithTxn(msgID MessageID, txn Transaction) error
AckIDCumulative(msgID MessageID) error
AckIDWithResponseCumulative(msgID MessageID) error
NackID(id MessageID)
NackMsg(msg Message)
}
type consumer struct {
sync.Mutex
topic string
client *client
options ConsumerOptions
consumers []*partitionConsumer
consumerName string
disableForceTopicCreation bool
// channel used to deliver message to clients
messageCh chan ConsumerMessage
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()
log log.Logger
metrics *internal.LeveledMetrics
}
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
return nil, newError(TopicNotFound, "topic is required")
}
if options.SubscriptionName == "" {
return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
}
if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = defaultReceiverQueueSize
}
if options.EnableZeroQueueConsumer {
options.ReceiverQueueSize = 0
}
if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}
if options.Name == "" {
options.Name = generateRandomName()
}
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
}
}
if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}
if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}
if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 10)
}
if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
}
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
}
topicName := internal.TopicNameWithoutPartitionPart(tn)
retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix
oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
retryTopic = oldRetryTopic
}
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
dlqTopic = oldDlqTopic
}
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
}
}
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
}
}
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
if topic == "" {
topic = options.Topics[0]
}
if tns, err = validateTopicNames(topic); err != nil {
return nil, err
}
topic = tns[0].Name
err = addMessageCryptoIfMissing(client, &options, topic)
if err != nil {
return nil, err
}
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
}
if len(options.Topics) > 1 {
if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
}
for i := range options.Topics {
options.Topics[i] = tns[i].Name
}
options.Topics = distinct(options.Topics)
err = addMessageCryptoIfMissing(client, &options, options.Topics)
if err != nil {
return nil, err
}
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
if options.TopicsPattern != "" {
tn, err := internal.ParseTopicName(options.TopicsPattern)
if err != nil {
return nil, err
}
pattern, err := extractTopicPattern(tn)
if err != nil {
return nil, err
}
err = addMessageCryptoIfMissing(client, &options, tn.Name)
if err != nil {
return nil, err
}
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}
return nil, newError(InvalidTopicName, "topic name is required for consumer")
}
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
partitions, err := client.TopicPartitions(topic)
if err != nil {
return nil, err
}
if len(partitions) > 1 && options.EnableZeroQueueConsumer {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}
if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}
if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}
consumer := &consumer{
topic: topic,
client: client,
options: options,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
err = consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
consumer.metrics.ConsumersOpened.Inc()
return consumer, nil
}
// Name returns the name of consumer.
func (c *consumer) Name() string {
return c.consumerName
}
func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
c.internalTopicSubscribeToPartitions()
}
}
}()
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
c.Lock()
defer c.Unlock()
oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
c.consumers = make([]*partitionConsumer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}
type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func() {
defer wg.Done()
opts := newPartitionConsumerOpts(partitionTopic, c.consumerName, partitionIdx, c.options)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: partitionIdx,
consumer: cons,
}
}()
}
go func() {
wg.Wait()
close(ch)
}()
for ce := range ch {
if ce.err != nil {
err = ce.err
} else {
c.consumers[ce.partition] = ce.consumer
}
}
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}
if newNumPartitions < oldNumPartitions {
c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
} else {
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
}
return nil
}
func newPartitionConsumerOpts(topic, consumerName string, idx int, options ConsumerOptions) *partitionConsumerOpts {
var nackRedeliveryDelay time.Duration
if options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = options.NackRedeliveryDelay
}
return &partitionConsumerOpts{
topic: topic,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
metadata: options.Properties,
subProperties: options.SubscriptionProperties,
replicateSubscriptionState: options.ReplicateSubscriptionState,
startMessageID: options.startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: options.SubscriptionMode,
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
backOffPolicyFunc: options.BackOffPolicyFunc,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
ackWithResponse: options.AckWithResponse,
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
consumerEventListener: options.EventListener,
enableBatchIndexAck: options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: options.AckGroupingOptions,
autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize,
}
}
func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
func (c *consumer) Unsubscribe() error {
return c.unsubscribe(false)
}
func (c *consumer) UnsubscribeForce() error {
return c.unsubscribe(true)
}
func (c *consumer) unsubscribe(force bool) error {
c.Lock()
defer c.Unlock()
var errMsg string
for _, consumer := range c.consumers {
if err := consumer.unsubscribe(force); err != nil {
errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
}
}
if errMsg != "" {
return errors.New(errMsg)
}
return nil
}
func (c *consumer) GetLastMessageIDs() ([]TopicMessageID, error) {
ids := make([]TopicMessageID, 0)
for _, pc := range c.consumers {
id, err := pc.getLastMessageID()
tm := &topicMessageID{topic: pc.topic, track: id}
if err != nil {
return nil, err
}
ids = append(ids, tm)
}
return ids, nil
}
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (c *consumer) AckWithTxn(msg Message, txn Transaction) error {
msgID := msg.ID()
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
return c.consumers[msgID.PartitionIdx()].AckIDWithTxn(msgID, txn)
}
// Chan return the message chan to users
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
// Ack the consumption of a single message
func (c *consumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
// AckID the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
if c.options.AckWithResponse {
return c.consumers[msgID.PartitionIdx()].AckIDWithResponse(msgID)
}
return c.consumers[msgID.PartitionIdx()].AckID(msgID)
}
func (c *consumer) AckIDList(msgIDs []MessageID) error {
return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) (acker, error) {
if err := c.checkMsgIDPartition(msgID); err != nil {
return nil, err
}
return c.consumers[msgID.PartitionIdx()], nil
})
}
// AckCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *consumer) AckCumulative(msg Message) error {
return c.AckIDCumulative(msg.ID())
}
// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
func (c *consumer) AckIDCumulative(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
if c.options.AckWithResponse {
return c.consumers[msgID.PartitionIdx()].AckIDWithResponseCumulative(msgID)
}
return c.consumers[msgID.PartitionIdx()].AckIDCumulative(msgID)
}
// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}
// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
if delay < 0 {
delay = 0
}
if !checkMessageIDType(msg.ID()) {
c.log.Warnf("invalid message id type %T", msg.ID())
return
}
msgID := c.messageID(msg.ID())
if msgID == nil {
return
}
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
}
for k, v := range customProperties {
props[k] = v
}
reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: props,
DeliverAfter: delay,
},
}
}
}
func (c *consumer) Nack(msg Message) {
if !checkMessageIDType(msg.ID()) {
c.log.Warnf("invalid message id type %T", msg.ID())
return
}
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
mid := c.messageID(msg.ID())
if mid == nil {
return
}
if mid.consumer != nil {
mid.NackByMsg(msg)
return
}
c.consumers[mid.partitionIdx].NackMsg(msg)
return
}
c.NackID(msg.ID())
}
func (c *consumer) NackID(msgID MessageID) {
if err := c.checkMsgIDPartition(msgID); err != nil {
return
}
c.consumers[msgID.PartitionIdx()].NackID(msgID)
}
func (c *consumer) Close() {
c.closeOnce.Do(func() {
c.stopDiscovery()
c.Lock()
defer c.Unlock()
var wg sync.WaitGroup
for i := range c.consumers {
wg.Add(1)
go func(pc *partitionConsumer) {
defer wg.Done()
pc.Close()
}(c.consumers[i])
}
wg.Wait()
close(c.closeCh)
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
c.metrics.ConsumersClosed.Inc()
c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
})
}
func (c *consumer) Seek(msgID MessageID) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
}
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
consumer := c.consumers[msgID.PartitionIdx()]
consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
return consumer.Seek(msgID)
}
func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error
for _, cons := range c.consumers {
cons.pauseDispatchMessage()
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
}
}
return errs
}
func (c *consumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition < 0 || int(partition) >= len(c.consumers) {
c.log.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
}
return nil
}
func (c *consumer) hasNext() bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure all paths cancel the context to avoid context leak
var wg sync.WaitGroup
wg.Add(len(c.consumers))
hasNext := make(chan bool)
for _, pc := range c.consumers {
go func() {
defer wg.Done()
if pc.hasNext() {
select {
case hasNext <- true:
case <-ctx.Done():
}
}
}()
}
go func() {
wg.Wait()
close(hasNext) // Close the channel after all goroutines have finished
}()
// Wait for either a 'true' result or for all goroutines to finish
for hn := range hasNext {
if hn {
return true
}
}
return false
}
func (c *consumer) setLastDequeuedMsg(msgID MessageID) error {
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}
c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID)
return nil
}
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}
type random struct {
sync.Mutex
R *rand.Rand
}
func generateRandomName() string {
r.Lock()
defer r.Unlock()
chars := "abcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, 5)
for i := range bytes {
bytes[i] = chars[r.R.Intn(len(chars))]
}
return string(bytes)
}
func distinct(fqdnTopics []string) []string {
set := make(map[string]struct{})
uniques := make([]string, 0, len(fqdnTopics))
for _, topic := range fqdnTopics {
if _, ok := set[topic]; !ok {
set[topic] = struct{}{}
uniques = append(uniques, topic)
}
}
return uniques
}
func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
switch st {
case Exclusive:
return pb.CommandSubscribe_Exclusive
case Shared:
return pb.CommandSubscribe_Shared
case Failover:
return pb.CommandSubscribe_Failover
case KeyShared:
return pb.CommandSubscribe_Key_Shared
}
return pb.CommandSubscribe_Exclusive
}
func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_InitialPosition {
switch p {
case SubscriptionPositionLatest:
return pb.CommandSubscribe_Latest
case SubscriptionPositionEarliest:
return pb.CommandSubscribe_Earliest
}
return pb.CommandSubscribe_Latest
}
func (c *consumer) messageID(msgID MessageID) *trackingMessageID {
if err := c.checkMsgIDPartition(msgID); err != nil {
return nil
}
return toTrackingMessageID(msgID)
}
func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
// decryption is enabled, use default messagecrypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
false,
client.log.SubLogger(log.Fields{"topic": topics}))
if err != nil {
return err
}
options.Decryption.MessageCrypto = messageCrypto
}
return nil
}