in uamqp/client.py [0:0]
def send_message(self, messages, close_on_done=False):
"""Send a single message or batched message.
:param messages: A message to send. This can either be a single instance
of `Message`, or multiple messages wrapped in an instance of `BatchMessage`.
:type message: ~uamqp.message.Message
:param close_on_done: Close the client once the message is sent. Default is `False`.
:type close_on_done: bool
:raises: ~uamqp.errors.MessageException if message fails to send after retry policy
is exhausted.
"""
batch = messages.gather()
pending_batch = []
for message in batch:
message.idle_time = self._counter.get_current_ms()
self._pending_messages.append(message)
pending_batch.append(message)
self.open()
running = True
try:
while running and any([m for m in pending_batch if m.state not in constants.DONE_STATES]):
running = self.do_work()
failed = [m for m in pending_batch if m.state == constants.MessageState.SendFailed]
if any(failed):
details = {"total_messages": len(pending_batch), "number_failed": len(failed)}
details['failed_messages'] = {}
exception = None
for failed_message in failed:
exception = failed_message._response # pylint: disable=protected-access
details['failed_messages'][failed_message] = exception
raise errors.ClientMessageError(exception, info=details)
finally:
if close_on_done or not running:
self.close()