pulsar/consumer.go (108 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "fmt" "strings" "time" "github.com/apache/pulsar-client-go/pulsar/backoff" ) // ConsumerMessage represents a pair of a Consumer and Message. type ConsumerMessage struct { Consumer Message } // SubscriptionType of subscription supported by Pulsar type SubscriptionType int const ( // Exclusive there can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // Shared subscription mode, multiple consumer will be able to use the same subscription name // and the messages will be dispatched according to // a round-robin rotation between the connected consumers Shared // Failover subscription mode, multiple consumer will be able to use the same subscription name // but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // KeyShared subscription mode, multiple consumer will be able to use the same // subscription and all messages with the same key will be dispatched to only one consumer KeyShared ) type SubscriptionInitialPosition int const ( // SubscriptionPositionLatest is the latest position which means the start consuming position // will be the last message SubscriptionPositionLatest SubscriptionInitialPosition = iota // SubscriptionPositionEarliest is the earliest position which means the start consuming position // will be the first message SubscriptionPositionEarliest ) // DLQPolicy represents the configuration for the Dead Letter Queue consumer policy. type DLQPolicy struct { // MaxDeliveries specifies the maximum number of times that a message will be delivered before being // sent to the dead letter queue. MaxDeliveries uint32 // DeadLetterTopic specifies the name of the topic where the failing messages will be sent. DeadLetterTopic string // ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic ProducerOptions ProducerOptions // RetryLetterTopic specifies the name of the topic where the retry messages will be sent. RetryLetterTopic string // InitialSubscriptionName Name of the initial subscription name of the dead letter topic. // If this field is not set, the initial subscription for the dead letter topic will not be created. // If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer // will fail to be created. InitialSubscriptionName string } // AckGroupingOptions controls how to group ACK requests // If maxSize is 0 or 1, any ACK request will be sent immediately. // Otherwise, the ACK requests will be cached until one of the following conditions meets: // 1. There are `MaxSize` pending ACK requests. // 2. `MaxTime` is greater than 1 microsecond and ACK requests have been cached for `maxTime`. // Specially, for cumulative acknowledgment, only the latest ACK is cached and it will only be sent after `MaxTime`. type AckGroupingOptions struct { // The maximum number of ACK requests to cache MaxSize uint32 // The maximum time to cache ACK requests MaxTime time.Duration } // ConsumerOptions is used to configure and create instances of Consumer. type ConsumerOptions struct { // Topic specifies the topic this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string // Topics specifies a list of topics this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topics []string // TopicsPattern specifies a regular expression to subscribe to multiple topics under the same namespace. // Either a topic, a list of topics or a topics pattern are required when subscribing TopicsPattern string // AutoDiscoveryPeriod specifies the interval in which to poll for new partitions or new topics // if using a TopicsPattern. AutoDiscoveryPeriod time.Duration // SubscriptionName specifies the subscription name for this consumer // This argument is required when subscribing SubscriptionName string // Properties represents a set of application defined properties for the consumer. // Those properties will be visible in the topic stats Properties map[string]string // SubscriptionProperties specify the subscription properties for this subscription. // // > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a // > subscription if they use different properties. SubscriptionProperties map[string]string // Type specifies the subscription type to be used when subscribing to a topic. // Default is `Exclusive` Type SubscriptionType // SubscriptionInitialPosition is the initial position at which the cursor will be set when subscribe // Default is `Latest` SubscriptionInitialPosition // EventListener will be called when active consumer changed (in failover subscription type) EventListener ConsumerEventListener // DLQ represents the configuration for Dead Letter Queue consumer policy. // eg. route the message to topic X after N failed attempts at processing it // By default is nil and there's no DLQ DLQ *DLQPolicy // KeySharedPolicy represents the configuration for Key Shared consumer policy. KeySharedPolicy *KeySharedPolicy // RetryEnable determines whether to automatically retry sending messages to default filled DLQPolicy topics. // Default is false RetryEnable bool // MessageChannel sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ConsumerMessage // ReceiverQueueSize sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. ReceiverQueueSize int // EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0. // Notice: only non-partitioned topic is supported. // Default is false. EnableZeroQueueConsumer bool // EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled // by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer // receive queue can be scaled. // Default is false. EnableAutoScaledReceiverQueueSize bool // NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be // processed. Default is 1 min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration // Name specifies the consumer name. Name string // ReadCompacted, if enabled, the consumer will read messages from the compacted topic rather than reading the // full message backlog of the topic. This means that, if the topic has been compacted, the consumer will only // see the latest value for each key in the topic, up until the point in the topic message backlog that has been // compacted. Beyond that point, the messages will be sent as normal. // // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a // shared subscription, will lead to the subscription call throwing a PulsarClientException. ReadCompacted bool // ReplicateSubscriptionState marks the subscription as replicated to keep it in sync across clusters ReplicateSubscriptionState bool // Interceptors is a chain of interceptors. These interceptors will be called at some points defined in // ConsumerInterceptor interface. Interceptors ConsumerInterceptors // Schema represents the schema implementation. Schema Schema // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackOffPolicyFunc func() backoff.Policy // Decryption represents the encryption related fields required by the consumer to decrypt a message. Decryption *MessageDecryptionInfo // EnableDefaultNackBackoffPolicy, if enabled, the default implementation of NackBackoffPolicy will be used // to calculate the delay time of // nack backoff, Default: false. EnableDefaultNackBackoffPolicy bool // NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different // delays according to the number of times the message is retried. // // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` // > because we are not able to get the redeliveryCount from the message ID. NackBackoffPolicy NackBackoffPolicy // AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command // is executed correctly on the Broker side. When set to true, the error information returned by the Ack // method contains the return value of the Ack Command processed by the Broker side; when set to false, the // error information of the Ack method only contains errors that may occur in the Go SDK's own processing. // Default: false AckWithResponse bool // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) MaxPendingChunkedMessage int // ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds) ExpireTimeOfIncompleteChunk time.Duration // AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should // be removed (e.g.the chunked message pending queue is full). (default: false) AutoAckIncompleteChunk bool // Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment // is enabled on the broker side. (default: false) EnableBatchIndexAcknowledgment bool // Controls how to group ACK requests, the default value is nil, which means: // MaxSize: 1000 // MaxTime: 100*time.Millisecond // NOTE: This option does not work if AckWithResponse is true // because there are only synchronous APIs for acknowledgment AckGroupingOptions *AckGroupingOptions // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. // Note: This configuration also affects the seek operation. // Default is `false` and the consumer will start from the "next" message StartMessageIDInclusive bool // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. startMessageID *trackingMessageID } // This error is returned when `AckIDList` failed and `AckWithResponse` is true. // It only contains the valid message IDs that failed to be acknowledged in the `AckIDList` call. // For those invalid message IDs, users should ignore them and not acknowledge them again. type AckError map[MessageID]error func (e AckError) Error() string { builder := strings.Builder{} errorMap := make(map[string][]MessageID) for id, err := range e { errorMap[err.Error()] = append(errorMap[err.Error()], id) } for err, msgIDs := range errorMap { builder.WriteString(fmt.Sprintf("error: %s, failed message IDs: %v\n", err, msgIDs)) } return builder.String() } // Consumer is an interface that abstracts behavior of Pulsar's consumer type Consumer interface { // Subscription get a subscription for the consumer Subscription() string // Unsubscribe the consumer // // Unsubscribing will cause the subscription to be deleted, // and all the retained data can potentially be deleted based on message retention and ttl policy. // // This operation will fail when performed on a shared subscription // where more than one consumer are currently connected. Unsubscribe() error // UnsubscribeForce the consumer, forcefully unsubscribe by disconnecting connected consumers. // // Unsubscribing will cause the subscription to be deleted, // and all the retained data can potentially be deleted based on message retention and ttl policy. // // This operation will fail when performed on a shared subscription // where more than one consumer are currently connected. UnsubscribeForce() error // GetLastMessageIDs get all the last message id of the topics the consumer subscribed. // // The list of MessageID instances of all the topics that the consumer subscribed GetLastMessageIDs() ([]TopicMessageID, error) // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) // Chan returns a channel to consume messages from Chan() <-chan ConsumerMessage // Ack the consumption of a single message Ack(Message) error // AckID the consumption of a single message, identified by its MessageID // When `EnableBatchIndexAcknowledgment` is false, if a message ID represents a message in the batch, // it will not be actually acknowledged by broker until all messages in that batch are acknowledged via // `AckID` or `AckIDList`. AckID(MessageID) error // AckIDList the consumption of a list of messages, identified by their MessageIDs // // This method should be used when `AckWithResponse` is true. Otherwise, it will be equivalent with calling // `AckID` on each message ID in the list. // // When `AckWithResponse` is true, the returned error could be an `AckError` which contains the failed message ID // and the corresponding error. AckIDList([]MessageID) error // AckWithTxn the consumption of a single message with a transaction AckWithTxn(Message, Transaction) error // AckCumulative the reception of all the messages in the stream up to (and including) // the provided message. AckCumulative(msg Message) error // AckIDCumulative the reception of all the messages in the stream up to (and including) // the provided message, identified by its MessageID AckIDCumulative(msgID MessageID) error // ReconsumeLater mark a message for redelivery after custom delay ReconsumeLater(msg Message, delay time.Duration) // ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration) // Nack acknowledges the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NackRedeliveryDelay . // // This call is not blocking. Nack(Message) // NackID acknowledges the failure to process a single message. // // When a message is "negatively acked" it will be marked for redelivery after // some fixed delay. The delay is configurable when constructing the consumer // with ConsumerOptions.NackRedeliveryDelay . // // This call is not blocking. NackID(MessageID) // Close the consumer and stop the broker to push more messages Close() // Seek resets the subscription associated with this consumer to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. // // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the // seek() on the individual partitions. Seek(MessageID) error // SeekByTime resets the subscription associated with this consumer to a specific message publish time. // // @param time // the message publish time when to reposition the subscription // SeekByTime(time time.Time) error // Name returns the name of consumer. Name() string }