in tools/cloud/ros2-to-decoders.py [0:0]
def expand_ros2_message(self, message_type):
if message_type in self.PRIMITIVE_TYPES:
return {
"primitiveMessageDefinition": {
"ros2PrimitiveMessageDefinition": {
"primitiveType": self.PRIMITIVE_TYPES[message_type]
}
}
}
# Limited length strings:
fixed_string_match = re.match(r"^((|w)string)<(\d+)>$", message_type)
if fixed_string_match:
member_type = fixed_string_match.group(1)
upper_bound = fixed_string_match.group(3)
return {
"primitiveMessageDefinition": {
"ros2PrimitiveMessageDefinition": {
"primitiveType": self.PRIMITIVE_TYPES[member_type],
"upperBound": upper_bound,
}
}
}
# Unlimited or limited length lists:
list_match = re.match(r"^sequence<([\w/]+)(|, (\d+))>$", message_type)
if list_match:
member_type = list_match.group(1)
list_length = list_match.group(3)
if not list_length:
list_length = 0
list_type = "DYNAMIC_UNBOUNDED_CAPACITY"
else:
list_length = int(list_length)
list_type = "DYNAMIC_BOUNDED_CAPACITY"
else:
# Fixed length lists:
list_match = re.match(r"^([\w/]+)\[(\d+)\]$", message_type)
if list_match:
list_type = "FIXED_CAPACITY"
member_type = list_match.group(1)
list_length = int(list_match.group(2))
if list_match:
return {
"structuredMessageListDefinition": {
"name": "listType",
"memberType": self.expand_ros2_message(member_type),
"capacity": list_length,
"listType": list_type,
}
}
if "/" in message_type:
message_info = message_type.split("/")
module_name = message_info[0] + ".msg"
message_type_name = message_info[-1]
module = importlib.import_module(module_name)
message = getattr(module, message_type_name)()
fields = message.get_fields_and_field_types()
struct_def = {"structuredMessageDefinition": []}
for field_name, field_type in fields.items():
struct_def["structuredMessageDefinition"].append(
{
"fieldName": field_name,
"dataType": self.expand_ros2_message(field_type),
}
)
return struct_def
raise Exception("Unknown message type " + message_type)