def _receive_jms_bytesmessage()

in shims/qpid-proton-python/src/jms_messages_test/Receiver.py [0:0]


    def _receive_jms_bytesmessage(self, message):
        """Receives a JMS bytes message"""
        assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
        assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == proton.byte(3)
        if self.current_subtype == 'boolean':
            if message.body == b'\x00':
                return 'False'
            if message.body == b'\x01':
                return 'True'
            raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
                                   str(message.body))
        if self.current_subtype == 'byte':
            return hex(struct.unpack('b', message.body)[0])
        if self.current_subtype == 'bytes':
            return base64.b64encode(message.body).decode('utf-8')
        if self.current_subtype == 'char':
            if len(message.body) == 2: # format 'a' or '\xNN'
                return base64.b64encode(bytes([message.body[1]])).decode('utf-8') # strip leading '\x00' char
            raise InteropTestError('Unexpected string length for type char: %d' % len(message.body))
        if self.current_subtype == 'double':
            return '0x%016x' % struct.unpack('!Q', message.body)[0]
        if self.current_subtype == 'float':
            return '0x%08x' % struct.unpack('!L', message.body)[0]
        if self.current_subtype == 'int':
            return hex(struct.unpack('!i', message.body)[0])
        if self.current_subtype == 'long':
            return hex(struct.unpack('!q', message.body)[0])
        if self.current_subtype == 'short':
            return hex(struct.unpack('!h', message.body)[0])
        if self.current_subtype == 'string':
            # NOTE: first 2 bytes are string length, must be present
            if len(message.body) >= 2:
                str_len = struct.unpack('!H', message.body[:2])[0]
                str_body = message.body[2:].decode('utf-8')
                if len(str_body) != str_len:
                    raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
                                           (str_len, str_body, len(str_body)))
                return str_body
            raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
                                   (repr(message.body), len(message.body)))
        raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
                               (self.jms_msg_type, self.current_subtype))