in shims/qpid-proton-python/src/amqp_types_test/Receiver.py [0:0]
def get_amqp_type(amqp_value):
"""Get the AMQP type from the Python type"""
if amqp_value is None:
return "null"
if isinstance(amqp_value, bool):
return "boolean"
if isinstance(amqp_value, proton.ubyte):
return "ubyte"
if isinstance(amqp_value, proton.ushort):
return "ushort"
if isinstance(amqp_value, proton.uint):
return "uint"
if isinstance(amqp_value, proton.ulong):
return "ulong"
if isinstance(amqp_value, proton.byte):
return "byte"
if isinstance(amqp_value, proton.short):
return "short"
if isinstance(amqp_value, proton.int32):
return "int"
if isinstance(amqp_value, proton.float32):
return "float"
if isinstance(amqp_value, proton.decimal32):
return "decimal32"
if isinstance(amqp_value, proton.decimal64):
return "decimal64"
if isinstance(amqp_value, proton.decimal128):
return "decimal128"
if isinstance(amqp_value, proton.char):
return "char"
if isinstance(amqp_value, proton.timestamp):
return "timestamp"
if isinstance(amqp_value, uuid.UUID):
return "uuid"
if isinstance(amqp_value, proton.symbol):
return "symbol"
if isinstance(amqp_value, proton.Array):
return "array"
# Native types come last so that parent classes will not be found instead (issue using isinstance()
if isinstance(amqp_value, int):
return "long"
if isinstance(amqp_value, bytes):
return "binary"
if isinstance(amqp_value, str):
return "string"
if isinstance(amqp_value, float):
return "double"
if isinstance(amqp_value, list):
return "list"
if isinstance(amqp_value, dict):
return "map"
print('receive: Unmapped AMQP type: %s:%s' % (type(amqp_value), amqp_value))
return None