def create_producer()

in pulsar/__init__.py [0:0]


    def create_producer(self, topic,
                        producer_name=None,
                        schema=schema.BytesSchema(),
                        initial_sequence_id=None,
                        send_timeout_millis=30000,
                        compression_type=CompressionType.NONE,
                        max_pending_messages=1000,
                        max_pending_messages_across_partitions=50000,
                        block_if_queue_full=False,
                        batching_enabled=False,
                        batching_max_messages=1000,
                        batching_max_allowed_size_in_bytes=128*1024,
                        batching_max_publish_delay_ms=10,
                        chunking_enabled=False,
                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
                        lazy_start_partitioned_producers=False,
                        properties=None,
                        batching_type=BatchingType.Default,
                        encryption_key=None,
                        crypto_key_reader=None
                        ):
        """
        Create a new producer on a given topic.

        Parameters
        ----------

        topic: str
            The topic name
        producer_name: str, optional
            Specify a name for the producer. If not assigned, the system will generate a globally unique name
            which can be accessed with `Producer.producer_name()`. When specifying a name, it is app to the user
            to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
            Define the schema of the data that will be published by this producer, e.g,
            ``schema=JsonSchema(MyRecordClass)``.

            The schema will be used for two purposes:
                * Validate the data format against the topic defined schema
                * Perform serialization/deserialization between data and objects
        initial_sequence_id: int, optional
            Set the baseline for the sequence ids for messages published by the producer. First message will be
            using ``(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned
            incremental sequence ids, if not otherwise specified.
        send_timeout_millis: int, default=30000
            If a message is not acknowledged by the server before the ``send_timeout`` expires, an error will be reported.
        compression_type: CompressionType, default=CompressionType.NONE
            Set the compression type for the producer. By default, message payloads are not compressed.

            Supported compression types:

            * CompressionType.LZ4
            * CompressionType.ZLib
            * CompressionType.ZSTD
            * CompressionType.SNAPPY

            ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to
            be able to receive messages compressed with ZSTD.

            SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to
            be able to receive messages compressed with SNAPPY.
        max_pending_messages: int, default=1000
            Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
        max_pending_messages_across_partitions: int, default=50000
            Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions
            from the broker.
        block_if_queue_full: bool, default=False
            Set whether `send_async` operations should block when the outgoing message queue is full.
        message_routing_mode: PartitionsRoutingMode, default=PartitionsRoutingMode.RoundRobinDistribution
            Set the message routing mode for the partitioned producer.

            Supported modes:

            * ``PartitionsRoutingMode.RoundRobinDistribution``
            * ``PartitionsRoutingMode.UseSinglePartition``
        lazy_start_partitioned_producers: bool, default=False
            This config affects producers of partitioned topics only. It controls whether producers register
            and connect immediately to the owner broker of each partition or start lazily on demand. The internal
            producer of one partition is always started eagerly, chosen by the routing policy, but the internal
            producers of any additional partitions are started on demand, upon receiving their first message.

            Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when
            the SinglePartition routing policy is used without keyed messages. Because producer connection can be
            on demand, this can produce extra send latency for the first messages of a given partition.
        properties: dict, optional
            Sets the properties for the producer. The properties associated with a producer can be used for identify
            a producer at broker side.
        batching_type: BatchingType, default=BatchingType.Default
            Sets the batching type for the producer.

            There are two batching type: DefaultBatching and KeyBasedBatching.

            DefaultBatching will batch single messages:
                (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
            ... into single batch message:
                [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]

            KeyBasedBatching will batch incoming single messages:
                (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
            ... into single batch message:
                [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
        chunking_enabled: bool, default=False
            If message size is higher than allowed max publish-payload size by broker then chunking_enabled helps
            producer to split message into multiple chunks and publish them to broker separately and in order.
            So, it allows client to successfully publish large size of messages in pulsar.
        encryption_key: str, optional
            The key used for symmetric encryption, configured on the producer side
        crypto_key_reader: CryptoKeyReader, optional
            Symmetric encryption class implementation, configuring public key encryption messages for the producer
            and private key decryption messages for the consumer
        """
        _check_type(str, topic, 'topic')
        _check_type_or_none(str, producer_name, 'producer_name')
        _check_type(_schema.Schema, schema, 'schema')
        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
        _check_type(int, send_timeout_millis, 'send_timeout_millis')
        _check_type(CompressionType, compression_type, 'compression_type')
        _check_type(int, max_pending_messages, 'max_pending_messages')
        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
        _check_type(bool, batching_enabled, 'batching_enabled')
        _check_type(int, batching_max_messages, 'batching_max_messages')
        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
        _check_type(bool, chunking_enabled, 'chunking_enabled')
        _check_type_or_none(dict, properties, 'properties')
        _check_type(BatchingType, batching_type, 'batching_type')
        _check_type_or_none(str, encryption_key, 'encryption_key')
        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
        _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')

        conf = _pulsar.ProducerConfiguration()
        conf.send_timeout_millis(send_timeout_millis)
        conf.compression_type(compression_type)
        conf.max_pending_messages(max_pending_messages)
        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
        conf.block_if_queue_full(block_if_queue_full)
        conf.batching_enabled(batching_enabled)
        conf.batching_max_messages(batching_max_messages)
        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
        conf.partitions_routing_mode(message_routing_mode)
        conf.batching_type(batching_type)
        conf.chunking_enabled(chunking_enabled)
        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
        if producer_name:
            conf.producer_name(producer_name)
        if initial_sequence_id:
            conf.initial_sequence_id(initial_sequence_id)
        if properties:
            for k, v in properties.items():
                conf.property(k, v)

        conf.schema(schema.schema_info())
        if encryption_key:
            conf.encryption_key(encryption_key)
        if crypto_key_reader:
            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

        if batching_enabled and chunking_enabled:
            raise ValueError("Batching and chunking of messages can't be enabled together.")

        p = Producer()
        p._producer = self._client.create_producer(topic, conf)
        p._schema = schema
        p._client = self._client
        return p