in uamqp/message.py [0:0]
def _multi_message_generator(self):
"""Generate multiple ~uamqp.message.Message objects from a single data
stream that in total may exceed the maximum individual message size.
Data will be continuously added to a single message until that message
reaches a max allowable size, at which point it will be yielded and
a new message will be started.
:rtype: generator[~uamqp.message.Message]
"""
unappended_message_bytes = None
while True:
new_message = self._create_batch_message()
message_size = new_message.get_message_encoded_size() + self.size_offset
body_size = 0
if unappended_message_bytes:
new_message._body.append( # pylint: disable=protected-access
unappended_message_bytes
)
body_size += len(unappended_message_bytes)
try:
for data in self._body_gen:
message_bytes = None
try:
# try to get the internal uamqp Message
internal_uamqp_message = data.message
except AttributeError:
# no inernal message, data could be uamqp Message or raw data
internal_uamqp_message = data
try:
# uamqp Message
if (
not internal_uamqp_message.application_properties
and self.application_properties
):
internal_uamqp_message.application_properties = (
self.application_properties
)
message_bytes = internal_uamqp_message.encode_message()
except AttributeError: # raw data
wrap_message = Message(
body=internal_uamqp_message,
application_properties=self.application_properties,
)
message_bytes = wrap_message.encode_message()
body_size += len(message_bytes)
if (body_size + message_size) > self.max_message_length:
new_message.on_send_complete = self.on_send_complete
unappended_message_bytes = message_bytes
yield new_message
raise StopIteration()
new_message._body.append( # pylint: disable=protected-access
message_bytes
)
except StopIteration:
_logger.debug("Sent partial message.")
continue
else:
new_message.on_send_complete = self.on_send_complete
yield new_message
_logger.debug("Sent all batched data.")
break