pulsar/consumer_multitopic.go (274 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "errors" "fmt" "sync" "time" "github.com/apache/pulsar-client-go/pulsar/internal" pkgerrors "github.com/pkg/errors" "github.com/apache/pulsar-client-go/pulsar/log" ) type multiTopicConsumer struct { client *client options ConsumerOptions consumerName string messageCh chan ConsumerMessage consumers map[string]Consumer dlq *dlqRouter rlq *retryRouter closeOnce sync.Once closeCh chan struct{} log log.Logger } func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { mtc := &multiTopicConsumer{ client: client, options: options, messageCh: messageCh, consumers: make(map[string]Consumer, len(topics)), closeCh: make(chan struct{}), dlq: dlq, rlq: rlq, log: client.log.SubLogger(log.Fields{"topic": topics}), consumerName: options.Name, } var errs error for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) { if ce.err != nil { errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) } else { mtc.consumers[ce.topic] = ce.consumer } } if errs != nil { for _, c := range mtc.consumers { c.Close() } return nil, errs } return mtc, nil } func (c *multiTopicConsumer) Subscription() string { return c.options.SubscriptionName } func (c *multiTopicConsumer) Unsubscribe() error { var errs error for t, consumer := range c.consumers { if err := consumer.Unsubscribe(); err != nil { msg := fmt.Sprintf("unable to unsubscribe from topic=%s subscription=%s", t, c.Subscription()) errs = pkgerrors.Wrap(err, msg) } } return errs } func (c *multiTopicConsumer) UnsubscribeForce() error { var errs error for t, consumer := range c.consumers { if err := consumer.UnsubscribeForce(); err != nil { msg := fmt.Sprintf("unable to force unsubscribe from topic=%s subscription=%s", t, c.Subscription()) errs = pkgerrors.Wrap(err, msg) } } return errs } func (c *multiTopicConsumer) GetLastMessageIDs() ([]TopicMessageID, error) { ids := make([]TopicMessageID, 0) for _, c := range c.consumers { id, err := c.GetLastMessageIDs() if err != nil { return nil, err } ids = append(ids, id...) } return ids, nil } func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { case <-c.closeCh: return nil, newError(ConsumerClosed, "consumer closed") case cm, ok := <-c.messageCh: if !ok { return nil, newError(ConsumerClosed, "consumer closed") } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() } } } // Chan return the message chan to users func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message func (c *multiTopicConsumer) Ack(msg Message) error { return c.AckID(msg.ID()) } // AckID the consumption of a single message, identified by its MessageID func (c *multiTopicConsumer) AckID(msgID MessageID) error { if !checkMessageIDType(msgID) { c.log.Warnf("invalid message id type %T", msgID) return errors.New("invalid message id type in multi_consumer") } mid := toTrackingMessageID(msgID) if mid.consumer == nil { c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) return errors.New("unable to ack message because consumer is nil") } if c.options.AckWithResponse { return mid.consumer.AckIDWithResponse(msgID) } return mid.consumer.AckID(msgID) } func (c *multiTopicConsumer) AckIDList(msgIDs []MessageID) error { return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) (acker, error) { if !checkMessageIDType(msgID) { return nil, fmt.Errorf("invalid message id type %T", msgID) } if mid := toTrackingMessageID(msgID); mid != nil && mid.consumer != nil { return mid.consumer, nil } return nil, errors.New("consumer is nil") }) } func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer func(MessageID) (acker, error)) error { consumerToMsgIDs := make(map[acker][]MessageID) for _, msgID := range msgIDs { if consumer, err := findConsumer(msgID); err == nil { consumerToMsgIDs[consumer] = append(consumerToMsgIDs[consumer], msgID) } else { log.Warnf("Can not find consumer for %v", msgID) } } subErrCh := make(chan error, len(consumerToMsgIDs)) for consumer, ids := range consumerToMsgIDs { go func() { subErrCh <- consumer.AckIDList(ids) }() } ackError := AckError{} for i := 0; i < len(consumerToMsgIDs); i++ { if topicAckError, ok := (<-subErrCh).(AckError); ok { for id, err := range topicAckError { ackError[id] = err } } } if len(ackError) == 0 { return nil } return ackError } // AckWithTxn the consumption of a single message with a transaction func (c *multiTopicConsumer) AckWithTxn(msg Message, txn Transaction) error { msgID := msg.ID() if !checkMessageIDType(msgID) { c.log.Warnf("invalid message id type %T", msgID) return errors.New("invalid message id type in multi_consumer") } mid := toTrackingMessageID(msgID) if mid.consumer == nil { c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) return errors.New("unable to ack message because consumer is nil") } return mid.consumer.AckIDWithTxn(msgID, txn) } // AckCumulative the reception of all the messages in the stream up to (and including) // the provided message func (c *multiTopicConsumer) AckCumulative(msg Message) error { return c.AckIDCumulative(msg.ID()) } // AckIDCumulative the reception of all the messages in the stream up to (and including) // the provided message, identified by its MessageID func (c *multiTopicConsumer) AckIDCumulative(msgID MessageID) error { if !checkMessageIDType(msgID) { c.log.Warnf("invalid message id type %T", msgID) return errors.New("invalid message id type in multi_consumer") } mid := toTrackingMessageID(msgID) if mid.consumer == nil { c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) return errors.New("unable to ack message because consumer is nil") } if c.options.AckWithResponse { return mid.consumer.AckIDWithResponseCumulative(msgID) } return mid.consumer.AckIDCumulative(msgID) } func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) { c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay) } func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration) { names, err := validateTopicNames(msg.Topic()) if err != nil { c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err) return } if len(names) != 1 { c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), names) return } tn := names[0] fqdnTopic := internal.TopicNameWithoutPartitionPart(tn) consumer, ok := c.consumers[fqdnTopic] if !ok { // check to see if the topic with the partition part is in the consumers // this can happen when the consumer is configured to consume from a specific partition if consumer, ok = c.consumers[tn.Name]; !ok { c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic()) return } } consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, delay) } func (c *multiTopicConsumer) Nack(msg Message) { if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil { msgID := msg.ID() if !checkMessageIDType(msgID) { c.log.Warnf("invalid message id type %T", msgID) return } mid := toTrackingMessageID(msgID) if mid.consumer == nil { c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) return } mid.NackByMsg(msg) return } c.NackID(msg.ID()) } func (c *multiTopicConsumer) NackID(msgID MessageID) { if !checkMessageIDType(msgID) { c.log.Warnf("invalid message id type %T", msgID) return } mid := toTrackingMessageID(msgID) if mid.consumer == nil { c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) return } mid.consumer.NackID(msgID) } func (c *multiTopicConsumer) Close() { c.closeOnce.Do(func() { var wg sync.WaitGroup wg.Add(len(c.consumers)) for _, con := range c.consumers { go func(consumer Consumer) { defer wg.Done() consumer.Close() }(con) } wg.Wait() close(c.closeCh) c.client.handlers.Del(c) c.dlq.close() c.rlq.close() }) } func (c *multiTopicConsumer) Seek(_ MessageID) error { return newError(SeekFailed, "seek command not allowed for multi topic consumer") } func (c *multiTopicConsumer) SeekByTime(_ time.Time) error { return newError(SeekFailed, "seek command not allowed for multi topic consumer") } // Name returns the name of consumer. func (c *multiTopicConsumer) Name() string { return c.consumerName }