in shims/qpid-proton-python/src/jms_messages_test/Receiver.py [0:0]
def _receive_jms_bytesmessage(self, message):
"""Receives a JMS bytes message"""
assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == proton.byte(3)
if self.current_subtype == 'boolean':
if message.body == b'\x00':
return 'False'
if message.body == b'\x01':
return 'True'
raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
str(message.body))
if self.current_subtype == 'byte':
return hex(struct.unpack('b', message.body)[0])
if self.current_subtype == 'bytes':
return base64.b64encode(message.body).decode('utf-8')
if self.current_subtype == 'char':
if len(message.body) == 2: # format 'a' or '\xNN'
return base64.b64encode(bytes([message.body[1]])).decode('utf-8') # strip leading '\x00' char
raise InteropTestError('Unexpected string length for type char: %d' % len(message.body))
if self.current_subtype == 'double':
return '0x%016x' % struct.unpack('!Q', message.body)[0]
if self.current_subtype == 'float':
return '0x%08x' % struct.unpack('!L', message.body)[0]
if self.current_subtype == 'int':
return hex(struct.unpack('!i', message.body)[0])
if self.current_subtype == 'long':
return hex(struct.unpack('!q', message.body)[0])
if self.current_subtype == 'short':
return hex(struct.unpack('!h', message.body)[0])
if self.current_subtype == 'string':
# NOTE: first 2 bytes are string length, must be present
if len(message.body) >= 2:
str_len = struct.unpack('!H', message.body[:2])[0]
str_body = message.body[2:].decode('utf-8')
if len(str_body) != str_len:
raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
(str_len, str_body, len(str_body)))
return str_body
raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
(repr(message.body), len(message.body)))
raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
(self.jms_msg_type, self.current_subtype))