in tools/cloud/vision_system_data_serializer.py [0:0]
def encode_message(self, cdr_encoder, message_definition, msg):
if "primitiveMessageDefinition" in message_definition:
primitive_type = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
]["primitiveType"]
if primitive_type in PRIMITIVE_TYPES:
cdr_encoder.push_primitive(PRIMITIVE_TYPES[primitive_type], msg)
elif primitive_type in STRING_TYPES:
upper_bound = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
].get("upperBound", 0)
cdr_encoder.push_string(STRING_TYPES[primitive_type], msg, upper_bound)
else:
raise Exception(f"unknown primitive type: {primitive_type}")
elif "structuredMessageListDefinition" in message_definition:
list_type = message_definition["structuredMessageListDefinition"]["listType"]
if list_type in ["DYNAMIC_BOUNDED_CAPACITY", "FIXED_CAPACITY"]:
capacity = message_definition["structuredMessageListDefinition"]["capacity"]
if list_type == "DYNAMIC_BOUNDED_CAPACITY" and len(msg) > capacity:
raise Exception(f"length of list exceeds capacity {capacity}")
if list_type == "FIXED_CAPACITY" and len(msg) != capacity:
raise Exception(f"length of list does not match capacity {capacity}")
if list_type in ["DYNAMIC_UNBOUNDED_CAPACITY", "DYNAMIC_BOUNDED_CAPACITY"]:
cdr_encoder.push_primitive(PRIMITIVE_TYPES["UINT32"], len(msg))
member_type = self.get_member_type(message_definition)
# Optimize primitive arrays:
if not isinstance(member_type, dict) and member_type in PRIMITIVE_TYPES:
cdr_encoder.push_primitive_array(PRIMITIVE_TYPES[member_type], msg)
else:
for element in msg:
self.encode_message(cdr_encoder, member_type, element)
elif "structuredMessageDefinition" in message_definition:
for field_definition in message_definition["structuredMessageDefinition"]:
self.encode_message(
cdr_encoder, field_definition["dataType"], msg[field_definition["fieldName"]]
)
else:
raise Exception(f"unknown message type: {message_definition}")