in pulsar/__init__.py [0:0]
def create_reader(self, topic, start_message_id,
schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
):
"""
Create a reader on a particular topic
Parameters
----------
topic:
The name of the topic.
start_message_id:
The initial reader positioning is done by specifying a message id. The options are:
* ``MessageId.earliest``:
Start reading from the earliest message available in the topic
* ``MessageId.latest``:
Start reading from the end topic, only getting messages published after the reader was created
* ``MessageId``:
When passing a particular message id, the reader will position itself on that specific position.
The first message to be read will be the message next to the specified messageId.
Message id can be serialized into a string and deserialized back into a `MessageId` object:
.. code-block:: python
# Serialize to string
s = msg.message_id().serialize()
# Deserialize from string
msg_id = MessageId.deserialize(s)
schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
Define the schema of the data that will be received by this reader.
reader_listener: optional
Sets a message listener for the reader. When the listener is set, the application will receive messages
through it. Calls to ``reader.read_next()`` will not be allowed. The listener function needs to accept
(reader, message), for example:
.. code-block:: python
def my_listener(reader, message):
# process message
pass
receiver_queue_size: int, default=1000
Sets the size of the reader receive queue. The reader receive queue controls how many messages can be
accumulated by the reader before the application calls `read_next()`. Using a higher value could
potentially increase the reader throughput at the expense of higher memory utilization.
reader_name: str, optional
Sets the reader name.
subscription_role_prefix: str, optional
Sets the subscription role prefix.
is_read_compacted: bool, default=False
Selects whether to read the compacted version of the topic
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
if isinstance(start_message_id, MessageId):
start_message_id = start_message_id._msg_id
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
c._schema = schema
c._schema.attach_client(self._client)
self._consumers.append(c)
return c