in tools/cloud/ros2-to-nodes.py [0:0]
def expand_ros2_message(self, message_type, fqn=None):
if message_type in self.PRIMITIVE_TYPES:
self.nodes.append(
{
"property": {
"fullyQualifiedName": fqn,
"dataType": self.PRIMITIVE_TYPES[message_type],
"dataEncoding": "TYPED",
}
}
)
return
# Limited length strings:
fixed_string_match = re.match(r"^((|w)string)<(\d+)>$", message_type)
if fixed_string_match:
member_type = fixed_string_match.group(1)
self.nodes.append(
{
"property": {
"fullyQualifiedName": fqn,
"dataType": self.PRIMITIVE_TYPES[member_type],
"dataEncoding": "TYPED",
}
}
)
return
# Unlimited or limited length lists:
list_match = re.match(r"^sequence<([\w/]+)(|, (\d+))>$", message_type)
if list_match:
member_type = list_match.group(1)
else:
# Fixed length lists:
list_match = re.match(r"^([\w/]+)\[(\d+)\]$", message_type)
if list_match:
member_type = list_match.group(1)
if list_match:
if member_type in self.PRIMITIVE_TYPES:
self.nodes.append(
{
"property": {
"fullyQualifiedName": fqn,
"dataType": f"{self.PRIMITIVE_TYPES[member_type]}_ARRAY",
"dataEncoding": "BINARY"
if self.PRIMITIVE_TYPES[member_type] == "UINT8"
else "TYPED",
}
}
)
return
if "/" in member_type:
self.expand_ros2_message(member_type)
type_fqn = f"Types.{self.sanitize(member_type)}"
self.nodes.append(
{
"property": {
"fullyQualifiedName": fqn,
"dataType": "STRUCT_ARRAY",
"structFullyQualifiedName": type_fqn,
}
}
)
return
raise Exception("Unknown message type " + member_type)
if "/" in message_type:
if message_type not in self.types:
self.types.append(message_type)
self.nodes.append(
{"struct": {"fullyQualifiedName": f"Types.{self.sanitize(message_type)}"}}
)
message_info = message_type.split("/")
module_name = message_info[0] + ".msg"
message_type_name = message_info[-1]
module = importlib.import_module(module_name)
message = getattr(module, message_type_name)()
fields = message.get_fields_and_field_types()
for field_name, field_type in fields.items():
self.expand_ros2_message(
field_type,
f"Types.{self.sanitize(message_type)}.{field_name}",
)
if fqn:
type_fqn = f"Types.{self.sanitize(message_type)}"
self.nodes.append(
{
"property": {
"fullyQualifiedName": fqn,
"dataType": "STRUCT",
"structFullyQualifiedName": type_fqn,
}
}
)
return
raise Exception("Unknown message type " + message_type)