in uamqp/message.py [0:0]
def _populate_message_attributes(self, c_message):
if self.properties:
c_message.properties = self.properties.get_properties_obj()
if self.application_properties:
if not isinstance(self.application_properties, dict):
raise TypeError("Application properties must be a dictionary.")
amqp_props = utils.data_factory(
self.application_properties, encoding=self._encoding
)
c_message.application_properties = amqp_props
if self.annotations:
if not isinstance(self.annotations, dict):
raise TypeError("Message annotations must be a dictionary.")
ann_props = c_uamqp.create_message_annotations(
utils.data_factory(self.annotations, encoding=self._encoding)
)
c_message.message_annotations = ann_props
if self.delivery_annotations:
if not isinstance(self.delivery_annotations, dict):
raise TypeError("Delivery annotations must be a dictionary.")
delivery_ann_props = c_uamqp.create_delivery_annotations(
utils.data_factory(self.delivery_annotations, encoding=self._encoding)
)
c_message.delivery_annotations = delivery_ann_props
if self.header:
c_message.header = self.header.get_header_obj()
if self.footer:
if not isinstance(self.footer, dict):
raise TypeError("Footer must be a dictionary.")
footer = c_uamqp.create_footer(
utils.data_factory(self.footer, encoding=self._encoding)
)
c_message.footer = footer