in pulsar/__init__.py [0:0]
def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type: CompressionType = CompressionType.NONE,
max_pending_messages=1000,
max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
chunking_enabled=False,
message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution,
lazy_start_partitioned_producers=False,
properties=None,
batching_type: BatchingType = BatchingType.Default,
encryption_key=None,
crypto_key_reader: Union[None, CryptoKeyReader] = None,
access_mode: ProducerAccessMode = ProducerAccessMode.Shared,
):
"""
Create a new producer on a given topic.
Parameters
----------
topic: str
The topic name
producer_name: str, optional
Specify a name for the producer. If not assigned, the system will generate a globally unique name
which can be accessed with `Producer.producer_name()`. When specifying a name, it is app to the user
to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
Define the schema of the data that will be published by this producer, e.g,
``schema=JsonSchema(MyRecordClass)``.
The schema will be used for two purposes:
* Validate the data format against the topic defined schema
* Perform serialization/deserialization between data and objects
initial_sequence_id: int, optional
Set the baseline for the sequence ids for messages published by the producer. First message will be
using ``(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned
incremental sequence ids, if not otherwise specified.
send_timeout_millis: int, default=30000
If a message is not acknowledged by the server before the ``send_timeout`` expires, an error will be reported.
compression_type: CompressionType, default=CompressionType.NONE
Set the compression type for the producer. By default, message payloads are not compressed.
Supported compression types:
* CompressionType.LZ4
* CompressionType.ZLib
* CompressionType.ZSTD
* CompressionType.SNAPPY
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to
be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to
be able to receive messages compressed with SNAPPY.
batching_enabled: bool, default=False
When automatic batching is enabled, multiple calls to `send` can result in a single batch to be sent to the
broker, leading to better throughput, especially when publishing small messages.
All messages in a batch will be published as a single batched message. The consumer will be delivered
individual messages in the batch in the same order they were enqueued.
batching_max_messages: int, default=1000
When you set this option to a value greater than 1, messages are queued until this threshold or
`batching_max_allowed_size_in_bytes` is reached or batch interval has elapsed.
batching_max_allowed_size_in_bytes: int, default=128*1024
When you set this option to a value greater than 1, messages are queued until this threshold or
`batching_max_messages` is reached or batch interval has elapsed.
batching_max_publish_delay_ms: int, default=10
The batch interval in milliseconds. Queued messages will be sent in batch after this interval even if both
the threshold of `batching_max_messages` and `batching_max_allowed_size_in_bytes` are not reached.
max_pending_messages: int, default=1000
Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
max_pending_messages_across_partitions: int, default=50000
Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions
from the broker.
block_if_queue_full: bool, default=False
Set whether `send_async` operations should block when the outgoing message queue is full.
message_routing_mode: PartitionsRoutingMode, default=PartitionsRoutingMode.RoundRobinDistribution
Set the message routing mode for the partitioned producer.
Supported modes:
* ``PartitionsRoutingMode.RoundRobinDistribution``
* ``PartitionsRoutingMode.UseSinglePartition``
lazy_start_partitioned_producers: bool, default=False
This config affects producers of partitioned topics only. It controls whether producers register
and connect immediately to the owner broker of each partition or start lazily on demand. The internal
producer of one partition is always started eagerly, chosen by the routing policy, but the internal
producers of any additional partitions are started on demand, upon receiving their first message.
Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when
the SinglePartition routing policy is used without keyed messages. Because producer connection can be
on demand, this can produce extra send latency for the first messages of a given partition.
properties: dict, optional
Sets the properties for the producer. The properties associated with a producer can be used for identify
a producer at broker side.
batching_type: BatchingType, default=BatchingType.Default
Sets the batching type for the producer.
There are two batching type: DefaultBatching and KeyBasedBatching.
DefaultBatching will batch single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
... into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
KeyBasedBatching will batch incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
... into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
chunking_enabled: bool, default=False
If message size is higher than allowed max publish-payload size by broker then chunking_enabled helps
producer to split message into multiple chunks and publish them to broker separately and in order.
So, it allows client to successfully publish large size of messages in pulsar.
encryption_key: str, optional
The key used for symmetric encryption, configured on the producer side
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
access_mode: ProducerAccessMode, optional
Set the type of access mode that the producer requires on the topic.
Supported modes:
* Shared: By default multiple producers can publish on a topic.
* Exclusive: Require exclusive access for producer.
Fail immediately if there's already a producer connected.
* WaitForExclusive: Producer creation is pending until it can acquire exclusive access.
* ExclusiveWithFencing: Acquire exclusive access for the producer.
Any existing producer will be removed and invalidated immediately.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
_check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
_check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type(bool, chunking_enabled, 'chunking_enabled')
_check_type_or_none(dict, properties, 'properties')
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
_check_type(ProducerAccessMode, access_mode, 'access_mode')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
conf.partitions_routing_mode(message_routing_mode)
conf.batching_type(batching_type)
conf.chunking_enabled(chunking_enabled)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
conf.access_mode(access_mode)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.schema(schema.schema_info())
if encryption_key:
conf.encryption_key(encryption_key)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
if batching_enabled and chunking_enabled:
raise ValueError("Batching and chunking of messages can't be enabled together.")
p = Producer()
p._producer = self._client.create_producer(topic, conf)
p._schema = schema
p._client = self._client
return p