in pulsar/__init__.py [0:0]
def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest,
crypto_key_reader=None,
replicate_subscription_state_enabled=False,
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False,
start_message_id_inclusive=False,
batch_receive_policy=None,
key_shared_policy=None,
batch_index_ack_enabled=False,
):
"""
Subscribe to the given topic and subscription combination.
Parameters
----------
topic:
The name of the topic, list of topics or regex pattern. This method will accept these forms:
* ``topic='my-topic'``
* ``topic=['topic-1', 'topic-2', 'topic-3']``
* ``topic=re.compile('persistent://public/default/topic-*')``
subscription_name: str
The name of the subscription.
consumer_type: ConsumerType, default=ConsumerType.Exclusive
Select the subscription type to be used when subscribing to the topic.
schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
Define the schema of the data that will be received by this consumer.
message_listener: optional
Sets a message listener for the consumer. When the listener is set, the application will
receive messages through it. Calls to ``consumer.receive()`` will not be allowed.
The listener function needs to accept (consumer, message), for example:
.. code-block:: python
def my_listener(consumer, message):
# process message
consumer.acknowledge(message)
receiver_queue_size: int, default=1000
Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be
accumulated by the consumer before the application calls `receive()`. Using a higher value could potentially
increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue
size to zero decreases the throughput of the consumer by disabling pre-fetching of messages.
This approach improves the message distribution on shared subscription by pushing messages only to those
consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The `receive()` function call should not be interrupted when the
consumer queue size is zero. The default value is 1000 messages and should work well for most use cases.
max_total_receiver_queue_size_across_partitions: int, default=50000
Set the max total receiver queue size across partitions. This setting will be used to reduce the
receiver queue size for individual partitions
consumer_name: str, optional
Sets the consumer name.
unacked_messages_timeout_ms: int, optional
Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than
10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful
acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered.
negative_ack_redelivery_delay_ms: int, default=60000
The delay after which to redeliver the messages that failed to be processed
(with the ``consumer.negative_acknowledge()``)
broker_consumer_stats_cache_time_ms: int, default=30000
Sets the time duration for which the broker-side consumer stats will be cached in the client.
is_read_compacted: bool, default=False
Selects whether to read the compacted version of the topic
properties: dict, optional
Sets the properties for the consumer. The properties associated with a consumer can be used for
identify a consumer at broker side.
pattern_auto_discovery_period: int, default=60
Periods of seconds for consumer to auto discover match topics.
initial_position: InitialPosition, default=InitialPosition.Latest
Set the initial position of a consumer when subscribing to the topic.
It could be either: ``InitialPosition.Earliest`` or ``InitialPosition.Latest``.
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
replicate_subscription_state_enabled: bool, default=False
Set whether the subscription status should be replicated.
max_pending_chunked_message: int, default=10
Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they
might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
chunks coming from different messages. This mainly happens when multiple publishers are publishing
messages on the topic concurrently or publisher failed to publish all chunks of the messages.
If it's zero, the pending chunked messages will not be limited.
auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
Buffering large number of outstanding uncompleted chunked messages can create memory pressure, and it
can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
start_message_id_inclusive: bool, default=False
Set the consumer to include the given position of any reset operation like Consumer::seek.
batch_receive_policy: class ConsumerBatchReceivePolicy
Set the batch collection policy for batch receiving.
key_shared_policy: class ConsumerKeySharedPolicy
Set the key shared policy for use when the ConsumerType is KeyShared.
batch_index_ack_enabled: Enable the batch index acknowledgement.
It should be noted that this option can only work when the broker side also enables the batch index
acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
_check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
_check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.subscription_initial_position(initial_position)
conf.schema(schema.schema_info())
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
conf.max_pending_chunked_message(max_pending_chunked_message)
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
conf.start_message_id_inclusive(start_message_id_inclusive)
if batch_receive_policy:
conf.batch_receive_policy(batch_receive_policy.policy())
if key_shared_policy:
conf.key_shared_policy(key_shared_policy.policy())
conf.batch_index_ack_enabled(batch_index_ack_enabled)
c = Consumer()
if isinstance(topic, str):
# Single topic
c._consumer = self._client.subscribe(topic, subscription_name, conf)
elif isinstance(topic, list):
# List of topics
c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
elif isinstance(topic, _retype):
# Regex pattern
c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
else:
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
c._client = self
c._schema = schema
c._schema.attach_client(self._client)
self._consumers.append(c)
return c