pulsar/consumer_zero_queue.go (227 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/pkg/errors"
)
type zeroQueueConsumer struct {
sync.Mutex
topic string
client *client
options ConsumerOptions
pc *partitionConsumer
consumerName string
disableForceTopicCreation bool
messageCh chan ConsumerMessage
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
log log.Logger
metrics *internal.LeveledMetrics
}
func newZeroConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter,
rlq *retryRouter, disableForceTopicCreation bool) (*zeroQueueConsumer, error) {
zc := &zeroQueueConsumer{
topic: topic,
client: client,
options: options,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
if err != nil {
return nil, err
}
zc.pc = conn
return zc, nil
}
func (z *zeroQueueConsumer) Subscription() string {
return z.options.SubscriptionName
}
func (z *zeroQueueConsumer) Unsubscribe() error {
return z.unsubscribe(false)
}
func (z *zeroQueueConsumer) UnsubscribeForce() error {
return z.unsubscribe(true)
}
func (z *zeroQueueConsumer) unsubscribe(force bool) error {
z.Lock()
defer z.Unlock()
if err := z.pc.unsubscribe(force); err != nil {
return errors.Errorf("topic %s, subscription %s: %v", z.topic, z.Subscription(), err)
}
return nil
}
func (z *zeroQueueConsumer) GetLastMessageIDs() ([]TopicMessageID, error) {
id, err := z.pc.getLastMessageID()
if err != nil {
return nil, err
}
tm := &topicMessageID{topic: z.pc.topic, track: id}
return []TopicMessageID{tm}, nil
}
func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
if state := z.pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
z.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return nil, errors.New("consumer state is closed")
}
z.Lock()
defer z.Unlock()
z.pc.availablePermits.inc()
for {
select {
case <-z.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-z.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage {
panic("zeroQueueConsumer cannot support Chan method")
}
func (z *zeroQueueConsumer) Ack(m Message) error {
return z.AckID(m.ID())
}
func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition != 0 {
z.log.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
}
return nil
}
func (z *zeroQueueConsumer) messageID(msgID MessageID) *trackingMessageID {
if err := z.checkMsgIDPartition(msgID); err != nil {
return nil
}
mid := toTrackingMessageID(msgID)
return mid
}
func (z *zeroQueueConsumer) AckID(msgID MessageID) error {
if err := z.checkMsgIDPartition(msgID); err != nil {
return err
}
if z.options.AckWithResponse {
return z.pc.AckIDWithResponse(msgID)
}
return z.pc.AckID(msgID)
}
func (z *zeroQueueConsumer) AckIDList(msgIDs []MessageID) error {
return z.pc.AckIDList(msgIDs)
}
func (z *zeroQueueConsumer) AckWithTxn(msg Message, txn Transaction) error {
msgID := msg.ID()
if err := z.checkMsgIDPartition(msgID); err != nil {
return err
}
return z.pc.AckIDWithTxn(msgID, txn)
}
func (z *zeroQueueConsumer) AckCumulative(msg Message) error {
return z.AckIDCumulative(msg.ID())
}
func (z *zeroQueueConsumer) AckIDCumulative(msgID MessageID) error {
if err := z.checkMsgIDPartition(msgID); err != nil {
return err
}
if z.options.AckWithResponse {
return z.pc.AckIDWithResponseCumulative(msgID)
}
return z.pc.AckIDCumulative(msgID)
}
func (z *zeroQueueConsumer) ReconsumeLater(_ Message, _ time.Duration) {
z.log.Warnf("zeroQueueConsumer not support ReconsumeLater yet.")
}
func (z *zeroQueueConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) {
z.log.Warnf("zeroQueueConsumer not support ReconsumeLaterWithCustomProperties yet.")
}
func (z *zeroQueueConsumer) Nack(msg Message) {
if !checkMessageIDType(msg.ID()) {
z.log.Warnf("invalid message id type %T", msg.ID())
return
}
if z.options.EnableDefaultNackBackoffPolicy || z.options.NackBackoffPolicy != nil {
mid := z.messageID(msg.ID())
if mid == nil {
return
}
if mid.consumer != nil {
mid.NackByMsg(msg)
return
}
z.pc.NackMsg(msg)
return
}
z.NackID(msg.ID())
}
func (z *zeroQueueConsumer) NackID(msgID MessageID) {
if err := z.checkMsgIDPartition(msgID); err != nil {
return
}
z.pc.NackID(msgID)
}
func (z *zeroQueueConsumer) Close() {
z.closeOnce.Do(func() {
z.Lock()
defer z.Unlock()
z.pc.Close()
close(z.closeCh)
z.client.handlers.Del(z)
z.dlq.close()
z.rlq.close()
z.metrics.ConsumersClosed.Inc()
z.metrics.ConsumersPartitions.Sub(float64(1))
})
}
func (z *zeroQueueConsumer) Seek(msgID MessageID) error {
z.Lock()
defer z.Unlock()
if err := z.checkMsgIDPartition(msgID); err != nil {
return err
}
if err := z.pc.Seek(msgID); err != nil {
return err
}
// clear messageCh
for len(z.messageCh) > 0 {
<-z.messageCh
}
return nil
}
func (z *zeroQueueConsumer) SeekByTime(time time.Time) error {
z.Lock()
defer z.Unlock()
var errs error
if err := z.pc.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", z.topic, z.Subscription())
errs = errors.Wrap(newError(SeekFailed, err.Error()), msg)
}
// clear messageCh
for len(z.messageCh) > 0 {
<-z.messageCh
}
return errs
}
func (z *zeroQueueConsumer) Name() string {
return z.consumerName
}