def expand_ros2_message()

in tools/cloud/ros2-to-decoders.py [0:0]


    def expand_ros2_message(self, message_type):
        if message_type in self.PRIMITIVE_TYPES:
            return {
                "primitiveMessageDefinition": {
                    "ros2PrimitiveMessageDefinition": {
                        "primitiveType": self.PRIMITIVE_TYPES[message_type]
                    }
                }
            }
        # Limited length strings:
        fixed_string_match = re.match(r"^((|w)string)<(\d+)>$", message_type)
        if fixed_string_match:
            member_type = fixed_string_match.group(1)
            upper_bound = fixed_string_match.group(3)
            return {
                "primitiveMessageDefinition": {
                    "ros2PrimitiveMessageDefinition": {
                        "primitiveType": self.PRIMITIVE_TYPES[member_type],
                        "upperBound": upper_bound,
                    }
                }
            }
        # Unlimited or limited length lists:
        list_match = re.match(r"^sequence<([\w/]+)(|, (\d+))>$", message_type)
        if list_match:
            member_type = list_match.group(1)
            list_length = list_match.group(3)
            if not list_length:
                list_length = 0
                list_type = "DYNAMIC_UNBOUNDED_CAPACITY"
            else:
                list_length = int(list_length)
                list_type = "DYNAMIC_BOUNDED_CAPACITY"
        else:
            # Fixed length lists:
            list_match = re.match(r"^([\w/]+)\[(\d+)\]$", message_type)
            if list_match:
                list_type = "FIXED_CAPACITY"
                member_type = list_match.group(1)
                list_length = int(list_match.group(2))
        if list_match:
            return {
                "structuredMessageListDefinition": {
                    "name": "listType",
                    "memberType": self.expand_ros2_message(member_type),
                    "capacity": list_length,
                    "listType": list_type,
                }
            }
        if "/" in message_type:
            message_info = message_type.split("/")
            module_name = message_info[0] + ".msg"
            message_type_name = message_info[-1]
            module = importlib.import_module(module_name)
            message = getattr(module, message_type_name)()
            fields = message.get_fields_and_field_types()
            struct_def = {"structuredMessageDefinition": []}
            for field_name, field_type in fields.items():
                struct_def["structuredMessageDefinition"].append(
                    {
                        "fieldName": field_name,
                        "dataType": self.expand_ros2_message(field_type),
                    }
                )
            return struct_def
        raise Exception("Unknown message type " + message_type)