in shims/qpid-proton-python/src/amqp_types_test/Receiver.py [0:0]
def decode_amqp_type(amqp_type, amqp_value):
"""Decode amqp type"""
if amqp_type == 'null':
return str(amqp_value)
if amqp_type == 'boolean':
return str(amqp_value)
if amqp_type == 'ubyte':
return hex(amqp_value)
if amqp_type == 'ushort':
return hex(amqp_value)
if amqp_type == 'uint':
return AmqpTypesTestReceiver.longhex(amqp_value)
if amqp_type == 'ulong':
return AmqpTypesTestReceiver.longhex(amqp_value)
if amqp_type == 'byte':
return hex(amqp_value)
if amqp_type == 'short':
return hex(amqp_value)
if amqp_type == 'int':
return hex(amqp_value)
if amqp_type == 'long':
return AmqpTypesTestReceiver.longhex(amqp_value)
if amqp_type == 'float':
return '0x%08x' % struct.unpack('!L', struct.pack('!f', amqp_value))[0]
if amqp_type == 'double':
return '0x%016x' % struct.unpack('!Q', struct.pack('!d', amqp_value))[0]
if amqp_type == 'decimal32':
return '0x%08x' % amqp_value
if amqp_type == 'decimal64':
return '0x%016x' % amqp_value
if amqp_type == 'decimal128':
return '0x' + ''.join(['%02x' % c for c in amqp_value]).strip()
if amqp_type == 'char':
if ord(amqp_value) < 0x80 and amqp_value in string.digits + string.ascii_letters + string.punctuation + ' ':
return amqp_value
return hex(ord(amqp_value))
if amqp_type == 'timestamp':
return AmqpTypesTestReceiver.longhex(amqp_value)
if amqp_type == 'uuid':
return str(amqp_value)
if amqp_type == 'binary':
return base64.b64encode(amqp_value).decode('utf-8')
if amqp_type == 'string':
return amqp_value
if amqp_type == 'symbol':
return amqp_value
if amqp_type in ['array', 'list', 'map']:
print('receive: Complex AMQP type "%s" unsupported, see amqp_complex_types_test' % amqp_type)
return None
print('receive: Unknown AMQP type "%s"' % amqp_type)
return None