pulsar/consumer_zero_queue.go (227 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "fmt" "sync" "time" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" "github.com/pkg/errors" ) type zeroQueueConsumer struct { sync.Mutex topic string client *client options ConsumerOptions pc *partitionConsumer consumerName string disableForceTopicCreation bool messageCh chan ConsumerMessage dlq *dlqRouter rlq *retryRouter closeOnce sync.Once closeCh chan struct{} errorCh chan error log log.Logger metrics *internal.LeveledMetrics } func newZeroConsumer(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*zeroQueueConsumer, error) { zc := &zeroQueueConsumer{ topic: topic, client: client, options: options, disableForceTopicCreation: disableForceTopicCreation, messageCh: messageCh, closeCh: make(chan struct{}), errorCh: make(chan error), dlq: dlq, rlq: rlq, log: client.log.SubLogger(log.Fields{"topic": topic}), consumerName: options.Name, metrics: client.metrics.GetLeveledMetrics(topic), } opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options) conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics) if err != nil { return nil, err } zc.pc = conn return zc, nil } func (z *zeroQueueConsumer) Subscription() string { return z.options.SubscriptionName } func (z *zeroQueueConsumer) Unsubscribe() error { return z.unsubscribe(false) } func (z *zeroQueueConsumer) UnsubscribeForce() error { return z.unsubscribe(true) } func (z *zeroQueueConsumer) unsubscribe(force bool) error { z.Lock() defer z.Unlock() if err := z.pc.unsubscribe(force); err != nil { return errors.Errorf("topic %s, subscription %s: %v", z.topic, z.Subscription(), err) } return nil } func (z *zeroQueueConsumer) GetLastMessageIDs() ([]TopicMessageID, error) { id, err := z.pc.getLastMessageID() if err != nil { return nil, err } tm := &topicMessageID{topic: z.pc.topic, track: id} return []TopicMessageID{tm}, nil } func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) { if state := z.pc.getConsumerState(); state == consumerClosed || state == consumerClosing { z.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return nil, errors.New("consumer state is closed") } z.Lock() defer z.Unlock() z.pc.availablePermits.inc() for { select { case <-z.closeCh: return nil, newError(ConsumerClosed, "consumer closed") case cm, ok := <-z.messageCh: if !ok { return nil, newError(ConsumerClosed, "consumer closed") } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() } } } func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage { panic("zeroQueueConsumer cannot support Chan method") } func (z *zeroQueueConsumer) Ack(m Message) error { return z.AckID(m.ID()) } func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error { partition := msgID.PartitionIdx() if partition != 0 { z.log.Errorf("invalid partition index %d expected a partition equal to 0", partition) return fmt.Errorf("invalid partition index %d expected a partition equal to 0", partition) } return nil } func (z *zeroQueueConsumer) messageID(msgID MessageID) *trackingMessageID { if err := z.checkMsgIDPartition(msgID); err != nil { return nil } mid := toTrackingMessageID(msgID) return mid } func (z *zeroQueueConsumer) AckID(msgID MessageID) error { if err := z.checkMsgIDPartition(msgID); err != nil { return err } if z.options.AckWithResponse { return z.pc.AckIDWithResponse(msgID) } return z.pc.AckID(msgID) } func (z *zeroQueueConsumer) AckIDList(msgIDs []MessageID) error { return z.pc.AckIDList(msgIDs) } func (z *zeroQueueConsumer) AckWithTxn(msg Message, txn Transaction) error { msgID := msg.ID() if err := z.checkMsgIDPartition(msgID); err != nil { return err } return z.pc.AckIDWithTxn(msgID, txn) } func (z *zeroQueueConsumer) AckCumulative(msg Message) error { return z.AckIDCumulative(msg.ID()) } func (z *zeroQueueConsumer) AckIDCumulative(msgID MessageID) error { if err := z.checkMsgIDPartition(msgID); err != nil { return err } if z.options.AckWithResponse { return z.pc.AckIDWithResponseCumulative(msgID) } return z.pc.AckIDCumulative(msgID) } func (z *zeroQueueConsumer) ReconsumeLater(_ Message, _ time.Duration) { z.log.Warnf("zeroQueueConsumer not support ReconsumeLater yet.") } func (z *zeroQueueConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) { z.log.Warnf("zeroQueueConsumer not support ReconsumeLaterWithCustomProperties yet.") } func (z *zeroQueueConsumer) Nack(msg Message) { if !checkMessageIDType(msg.ID()) { z.log.Warnf("invalid message id type %T", msg.ID()) return } if z.options.EnableDefaultNackBackoffPolicy || z.options.NackBackoffPolicy != nil { mid := z.messageID(msg.ID()) if mid == nil { return } if mid.consumer != nil { mid.NackByMsg(msg) return } z.pc.NackMsg(msg) return } z.NackID(msg.ID()) } func (z *zeroQueueConsumer) NackID(msgID MessageID) { if err := z.checkMsgIDPartition(msgID); err != nil { return } z.pc.NackID(msgID) } func (z *zeroQueueConsumer) Close() { z.closeOnce.Do(func() { z.Lock() defer z.Unlock() z.pc.Close() close(z.closeCh) z.client.handlers.Del(z) z.dlq.close() z.rlq.close() z.metrics.ConsumersClosed.Inc() z.metrics.ConsumersPartitions.Sub(float64(1)) }) } func (z *zeroQueueConsumer) Seek(msgID MessageID) error { z.Lock() defer z.Unlock() if err := z.checkMsgIDPartition(msgID); err != nil { return err } if err := z.pc.Seek(msgID); err != nil { return err } // clear messageCh for len(z.messageCh) > 0 { <-z.messageCh } return nil } func (z *zeroQueueConsumer) SeekByTime(time time.Time) error { z.Lock() defer z.Unlock() var errs error if err := z.pc.SeekByTime(time); err != nil { msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", z.topic, z.Subscription()) errs = errors.Wrap(newError(SeekFailed, err.Error()), msg) } // clear messageCh for len(z.messageCh) > 0 { <-z.messageCh } return errs } func (z *zeroQueueConsumer) Name() string { return z.consumerName }