def subscribe()

in pulsar/__init__.py [0:0]


    def subscribe(self, topic, subscription_name,
                  consumer_type: ConsumerType = ConsumerType.Exclusive,
                  schema=schema.BytesSchema(),
                  message_listener=None,
                  receiver_queue_size=1000,
                  max_total_receiver_queue_size_across_partitions=50000,
                  consumer_name=None,
                  unacked_messages_timeout_ms=None,
                  broker_consumer_stats_cache_time_ms=30000,
                  negative_ack_redelivery_delay_ms=60000,
                  is_read_compacted=False,
                  properties=None,
                  pattern_auto_discovery_period=60,
                  initial_position: InitialPosition = InitialPosition.Latest,
                  crypto_key_reader: Union[None, CryptoKeyReader] = None,
                  replicate_subscription_state_enabled=False,
                  max_pending_chunked_message=10,
                  auto_ack_oldest_chunked_message_on_queue_full=False,
                  start_message_id_inclusive=False,
                  batch_receive_policy=None,
                  key_shared_policy=None,
                  batch_index_ack_enabled=False,
                  regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
                  dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
                  crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
                  ):
        """
        Subscribe to the given topic and subscription combination.

        Parameters
        ----------

        topic:
            The name of the topic, list of topics or regex pattern. This method will accept these forms:
            * ``topic='my-topic'``
            * ``topic=['topic-1', 'topic-2', 'topic-3']``
            * ``topic=re.compile('persistent://public/default/topic-*')``
        subscription_name: str
            The name of the subscription.
        consumer_type: ConsumerType, default=ConsumerType.Exclusive
            Select the subscription type to be used when subscribing to the topic.
        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
            Define the schema of the data that will be received by this consumer.
        message_listener: optional
            Sets a message listener for the consumer. When the listener is set, the application will
            receive messages through it. Calls to ``consumer.receive()`` will not be allowed.
            The listener function needs to accept (consumer, message), for example:

            .. code-block:: python

                def my_listener(consumer, message):
                    # process message
                    consumer.acknowledge(message)
        receiver_queue_size: int, default=1000
            Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be
            accumulated by the consumer before the application calls `receive()`. Using a higher value could potentially
            increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue
            size to zero decreases the throughput of the consumer by disabling pre-fetching of messages.

            This approach improves the message distribution on shared subscription by pushing messages only to those
            consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used
            if the consumer queue size is zero. The `receive()` function call should not be interrupted when the
            consumer queue size is zero. The default value is 1000 messages and should work well for most use cases.
        max_total_receiver_queue_size_across_partitions: int, default=50000
            Set the max total receiver queue size across partitions. This setting will be used to reduce the
            receiver queue size for individual partitions
        consumer_name: str, optional
            Sets the consumer name.
        unacked_messages_timeout_ms: int, optional
            Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than
            10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful
            acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered.
        negative_ack_redelivery_delay_ms: int, default=60000
            The delay after which to redeliver the messages that failed to be processed
            (with the ``consumer.negative_acknowledge()``)
        broker_consumer_stats_cache_time_ms: int, default=30000
            Sets the time duration for which the broker-side consumer stats will be cached in the client.
        is_read_compacted: bool, default=False
            Selects whether to read the compacted version of the topic
        properties: dict, optional
            Sets the properties for the consumer. The properties associated with a consumer can be used for
            identify a consumer at broker side.
        pattern_auto_discovery_period: int, default=60
            Periods of seconds for consumer to auto discover match topics.
        initial_position: InitialPosition, default=InitialPosition.Latest
          Set the initial position of a consumer when subscribing to the topic.
          It could be either: ``InitialPosition.Earliest`` or ``InitialPosition.Latest``.
        crypto_key_reader: CryptoKeyReader, optional
            Symmetric encryption class implementation, configuring public key encryption messages for the producer
            and private key decryption messages for the consumer
        replicate_subscription_state_enabled: bool, default=False
            Set whether the subscription status should be replicated.
        max_pending_chunked_message: int, default=10
          Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
          While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they
          might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
          chunks coming from different messages. This mainly happens when multiple publishers are publishing
          messages on the topic concurrently or publisher failed to publish all chunks of the messages.

          If it's zero, the pending chunked messages will not be limited.
        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
          Buffering large number of outstanding uncompleted chunked messages can create memory pressure, and it
          can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
          Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
          if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
        start_message_id_inclusive: bool, default=False
          Set the consumer to include the given position of any reset operation like Consumer::seek.
        batch_receive_policy: class ConsumerBatchReceivePolicy
          Set the batch collection policy for batch receiving.
        key_shared_policy: class ConsumerKeySharedPolicy
            Set the key shared policy for use when the ConsumerType is KeyShared.
        batch_index_ack_enabled: Enable the batch index acknowledgement.
            It should be noted that this option can only work when the broker side also enables the batch index
            acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
        regex_subscription_mode: RegexSubscriptionMode, optional
            Set the regex subscription mode for use when the topic is a regex pattern.

            Supported modes:

            * PersistentOnly: By default only subscribe to persistent topics.
            * NonPersistentOnly: Only subscribe to non-persistent topics.
            * AllTopics: Subscribe to both persistent and non-persistent topics.
        dead_letter_policy: class ConsumerDeadLetterPolicy
          Set dead letter policy for consumer.
          By default, some messages are redelivered many times, even to the extent that they can never be
          stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
          exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
          automatically.
        crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
          Set the behavior when the decryption fails. The default is to fail the message.

          Supported actions:

          * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
          * ConsumerCryptoFailureAction.DISCARD:
            Message is silently acknowledged and not delivered to the application.
          * ConsumerCryptoFailureAction.CONSUME:
            Deliver the encrypted message to the application. It's the application's responsibility
            to decrypt the message. If message is also compressed, decompression will fail. If the
            message contains batch messages, client will not be able to retrieve individual messages
            in the batch.
        """
        _check_type(str, subscription_name, 'subscription_name')
        _check_type(ConsumerType, consumer_type, 'consumer_type')
        _check_type(_schema.Schema, schema, 'schema')
        _check_type(int, receiver_queue_size, 'receiver_queue_size')
        _check_type(int, max_total_receiver_queue_size_across_partitions,
                    'max_total_receiver_queue_size_across_partitions')
        _check_type_or_none(str, consumer_name, 'consumer_name')
        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
        _check_type(bool, is_read_compacted, 'is_read_compacted')
        _check_type_or_none(dict, properties, 'properties')
        _check_type(InitialPosition, initial_position, 'initial_position')
        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
        _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
        _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
        _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
        _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
        _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
        _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
        _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
        _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')

        conf = _pulsar.ConsumerConfiguration()
        conf.consumer_type(consumer_type)
        conf.regex_subscription_mode(regex_subscription_mode)
        conf.read_compacted(is_read_compacted)
        if message_listener:
            conf.message_listener(_listener_wrapper(message_listener, schema))
        conf.receiver_queue_size(receiver_queue_size)
        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
        if consumer_name:
            conf.consumer_name(consumer_name)
        if unacked_messages_timeout_ms:
            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)

        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
        if properties:
            for k, v in properties.items():
                conf.property(k, v)
        conf.subscription_initial_position(initial_position)

        conf.schema(schema.schema_info())

        if crypto_key_reader:
            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

        conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
        conf.max_pending_chunked_message(max_pending_chunked_message)
        conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
        conf.start_message_id_inclusive(start_message_id_inclusive)
        if batch_receive_policy:
            conf.batch_receive_policy(batch_receive_policy.policy())

        if key_shared_policy:
            conf.key_shared_policy(key_shared_policy.policy())
        conf.batch_index_ack_enabled(batch_index_ack_enabled)
        if dead_letter_policy:
            conf.dead_letter_policy(dead_letter_policy.policy())
        conf.crypto_failure_action(crypto_failure_action)

        c = Consumer()
        if isinstance(topic, str):
            # Single topic
            c._consumer = self._client.subscribe(topic, subscription_name, conf)
        elif isinstance(topic, list):
            # List of topics
            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
        elif isinstance(topic, _retype):
            # Regex pattern
            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
        else:
            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")

        c._client = self
        c._schema = schema
        c._schema.attach_client(self._client)
        self._consumers.append(c)
        return c