in shims/qpid-proton-python/src/amqp_types_test/Sender.py [0:0]
def encode_amqp_type(amqp_type, test_value):
"""Encode an AMQP type from a stringified test_value"""
if amqp_type == 'null':
return None
if amqp_type == 'boolean':
return test_value == 'True'
if amqp_type == 'ubyte':
return proton.ubyte(int(test_value, 16))
if amqp_type == 'ushort':
return proton.ushort(int(test_value, 16))
if amqp_type == 'uint':
return proton.uint(int(test_value, 16))
if amqp_type == 'ulong':
return proton.ulong(int(test_value, 16))
if amqp_type == 'byte':
return proton.byte(int(test_value, 16))
if amqp_type == 'short':
return proton.short(int(test_value, 16))
if amqp_type == 'int':
return proton.int32(int(test_value, 16))
if amqp_type == 'long':
return int(test_value, 16)
if amqp_type == 'float':
return proton.float32(struct.unpack('!f', bytes.fromhex(test_value[2:]))[0])
if amqp_type == 'double':
return struct.unpack('!d', bytes.fromhex(test_value[2:]))[0]
if amqp_type == 'decimal32':
return proton.decimal32(int(test_value[2:], 16))
if amqp_type == 'decimal64':
return proton.decimal64(int(test_value[2:], 16))
if amqp_type == 'decimal128':
return proton.decimal128(bytes.fromhex(test_value[2:]))
if amqp_type == 'char':
if len(test_value) == 1: # Format 'a'
return proton.char(test_value)
return proton.char(chr(int(test_value, 16)))
if amqp_type == 'timestamp':
return proton.timestamp(int(test_value, 16))
if amqp_type == 'uuid':
return uuid.UUID(test_value)
if amqp_type == 'binary':
return base64.b64decode(test_value)
if amqp_type == 'binarystr':
return str(test_value)
if amqp_type == 'string':
return str(test_value)
if amqp_type == 'symbol':
return proton.symbol(test_value)
if amqp_type in ['array', 'list', 'map']:
print('send: Complex AMQP type "%s" unsupported, see amqp_complex_types_test' % amqp_type)
return None
print('send: Unknown AMQP type "%s"' % amqp_type)
return None