in pulsar/__init__.py [0:0]
def send_async(self, content, callback,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Send a message asynchronously.
Examples
--------
The ``callback`` will be invoked once the message has been acknowledged by the broker.
.. code-block:: python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10)
def callback(res, msg_id):
print('Message published res=%s', res)
while True:
producer.send_async(('Hello-%d' % i).encode('utf-8'), callback)
client.close()
When the producer queue is full, by default the message will be rejected
and the callback invoked with an error code.
Parameters
----------
content
A `bytes` object with the message payload.
callback
A callback that is invoked once the message has been acknowledged by the broker.
properties: optional
A dict of application0-defined string properties.
partition_key: optional
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.
sequence_id: optional
Specify a custom sequence id for the message being published.
replication_clusters: optional
Override namespace replication clusters. Note that it is the caller's responsibility
to provide valid cluster names and that all clusters have been previously configured
as topics. Given an empty list, the message will replicate per the namespace configuration.
disable_replication: optional
Do not replicate this message.
event_timestamp: optional
Timestamp in millis of the timestamp of event creation
deliver_at: optional
Specify the message should not be delivered earlier than the specified timestamp.
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)