in uamqp/client.py [0:0]
def receive_message_batch(self, max_batch_size=None, on_message_received=None, timeout=0):
"""Receive a batch of messages. Messages returned in the batch have already been
accepted - if you wish to add logic to accept or reject messages based on custom
criteria, pass in a callback. This method will return as soon as some messages are
available rather than waiting to achieve a specific batch size, and therefore the
number of messages returned per call will vary up to the maximum allowed.
If the receive client is configured with `auto_complete=True` then the messages received
in the batch returned by this function will already be settled. Alternatively, if
`auto_complete=False`, then each message will need to be explicitly settled before
it expires and is released.
:param max_batch_size: The maximum number of messages that can be returned in
one call. This value cannot be larger than the prefetch value, and if not specified,
the prefetch value will be used.
:type max_batch_size: int
:param on_message_received: A callback to process messages as they arrive from the
service. It takes a single argument, a ~uamqp.message.Message object.
:type on_message_received: callable[~uamqp.message.Message]
:param timeout: I timeout in milliseconds for which to wait to receive any messages.
If no messages are received in this time, an empty list will be returned. If set to
0, the client will continue to wait until at least one message is received. The
default is 0.
:type timeout: float
"""
self._message_received_callback = on_message_received
max_batch_size = max_batch_size or self._prefetch
if max_batch_size > self._prefetch:
raise ValueError(
'Maximum batch size cannot be greater than the '
'connection link credit: {}'.format(self._prefetch))
timeout = self._counter.get_current_ms() + timeout if timeout else 0
expired = False
self.open()
receiving = True
batch = []
while not self._received_messages.empty() and len(batch) < max_batch_size:
batch.append(self._received_messages.get())
self._received_messages.task_done()
if len(batch) >= max_batch_size:
return batch
self._timeout_reached = False
self._last_activity_timestamp = None
while receiving and not expired and len(batch) < max_batch_size and not self._timeout_reached:
while receiving and self._received_messages.qsize() < max_batch_size and not self._timeout_reached:
if timeout and self._counter.get_current_ms() > timeout:
expired = True
break
before = self._received_messages.qsize()
receiving = self.do_work()
received = self._received_messages.qsize() - before
if self._received_messages.qsize() > 0 and received == 0:
# No new messages arrived, but we have some - so return what we have.
expired = True
break
while not self._received_messages.empty() and len(batch) < max_batch_size:
batch.append(self._received_messages.get())
self._received_messages.task_done()
return batch