def create_reader()

in pulsar/__init__.py [0:0]


    def create_reader(self, topic, start_message_id,
                      schema=schema.BytesSchema(),
                      reader_listener=None,
                      receiver_queue_size=1000,
                      reader_name=None,
                      subscription_role_prefix=None,
                      is_read_compacted=False,
                      crypto_key_reader=None
                      ):
        """
        Create a reader on a particular topic

        Parameters
        ----------

        topic:
            The name of the topic.
        start_message_id:
            The initial reader positioning is done by specifying a message id. The options are:

            * ``MessageId.earliest``:

            Start reading from the earliest message available in the topic

            * ``MessageId.latest``:

            Start reading from the end topic, only getting messages published after the reader was created

            * ``MessageId``:

            When passing a particular message id, the reader will position itself on that specific position.
            The first message to be read will be the message next to the specified messageId.
            Message id can be serialized into a string and deserialized back into a `MessageId` object:

               .. code-block:: python

                   # Serialize to string
                   s = msg.message_id().serialize()

                   # Deserialize from string
                   msg_id = MessageId.deserialize(s)
        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
            Define the schema of the data that will be received by this reader.
        reader_listener: optional
            Sets a message listener for the reader. When the listener is set, the application will receive messages
            through it. Calls to ``reader.read_next()`` will not be allowed. The listener function needs to accept
            (reader, message), for example:

            .. code-block:: python

                def my_listener(reader, message):
                    # process message
                    pass
        receiver_queue_size: int, default=1000
            Sets the size of the reader receive queue. The reader receive queue controls how many messages can be
            accumulated by the reader before the application calls `read_next()`. Using a higher value could
            potentially increase the reader throughput at the expense of higher memory utilization.
        reader_name: str, optional
            Sets the reader name.
        subscription_role_prefix: str, optional
            Sets the subscription role prefix.
        is_read_compacted: bool, default=False
            Selects whether to read the compacted version of the topic
        crypto_key_reader: CryptoKeyReader, optional
            Symmetric encryption class implementation, configuring public key encryption messages for the producer
            and private key decryption messages for the consumer
        """
        
        # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
        if isinstance(start_message_id, MessageId):
            start_message_id = start_message_id._msg_id

        _check_type(str, topic, 'topic')
        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
        _check_type(_schema.Schema, schema, 'schema')
        _check_type(int, receiver_queue_size, 'receiver_queue_size')
        _check_type_or_none(str, reader_name, 'reader_name')
        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
        _check_type(bool, is_read_compacted, 'is_read_compacted')
        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')

        conf = _pulsar.ReaderConfiguration()
        if reader_listener:
            conf.reader_listener(_listener_wrapper(reader_listener, schema))
        conf.receiver_queue_size(receiver_queue_size)
        if reader_name:
            conf.reader_name(reader_name)
        if subscription_role_prefix:
            conf.subscription_role_prefix(subscription_role_prefix)
        conf.schema(schema.schema_info())
        conf.read_compacted(is_read_compacted)
        if crypto_key_reader:
            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

        c = Reader()
        c._reader = self._client.create_reader(topic, start_message_id, conf)
        c._client = self
        c._schema = schema
        c._schema.attach_client(self._client)
        self._consumers.append(c)
        return c