in tools/cloud/vision_system_data_serializer.py [0:0]
def decode_message(self, cdr_decoder, message_definition):
if "primitiveMessageDefinition" in message_definition:
primitive_type = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
]["primitiveType"]
if primitive_type in PRIMITIVE_TYPES:
return cdr_decoder.pop_primitive(PRIMITIVE_TYPES[primitive_type])
elif primitive_type in STRING_TYPES:
return cdr_decoder.pop_string(STRING_TYPES[primitive_type])
else:
raise Exception(f"unknown primitive type: {primitive_type}")
elif "structuredMessageListDefinition" in message_definition:
list_type = message_definition["structuredMessageListDefinition"]["listType"]
if list_type in ["DYNAMIC_UNBOUNDED_CAPACITY", "DYNAMIC_BOUNDED_CAPACITY"]:
list_length = cdr_decoder.pop_primitive(PRIMITIVE_TYPES["UINT32"])
else:
list_length = message_definition["structuredMessageListDefinition"]["capacity"]
member_type = self.get_member_type(message_definition)
# Optimize primitive arrays:
if not isinstance(member_type, dict) and member_type in PRIMITIVE_TYPES:
return cdr_decoder.pop_primitive_array(PRIMITIVE_TYPES[member_type], list_length)
else:
result = []
for _ in range(list_length):
result.append(self.decode_message(cdr_decoder, member_type))
return result
elif "structuredMessageDefinition" in message_definition:
result = {}
for field_definition in message_definition["structuredMessageDefinition"]:
result[field_definition["fieldName"]] = self.decode_message(
cdr_decoder, field_definition["dataType"]
)
return result
else:
raise Exception(f"unknown message type: {message_definition}")