in uamqp/message.py [0:0]
def _set_body_by_body_type(self, body, body_type):
if body_type == constants.MessageBodyType.Data:
self._body = DataBody(self._message)
if isinstance(body, (str, bytes)):
self._body.append(body)
elif isinstance(body, list) and all([isinstance(b, (str, bytes)) for b in body]):
for value in body:
self._body.append(value)
else:
raise TypeError(
"For MessageBodyType.Data, the body"
" must be str or bytes or list of str or bytes.")
elif body_type == constants.MessageBodyType.Sequence:
self._body = SequenceBody(self._message)
if isinstance(body, list) and all([isinstance(b, list) for b in body]):
for value in body:
self._body.append(value)
elif isinstance(body, list):
self._body.append(body)
else:
raise TypeError(
"For MessageBodyType.Sequence, the body"
" must be list or list of lists.")
elif body_type == constants.MessageBodyType.Value:
self._body = ValueBody(self._message)
self._body.set(body)
else:
raise ValueError("Unsupported MessageBodyType: {}".format(body_type))