def encode_message()

in tools/cloud/vision_system_data_serializer.py [0:0]


    def encode_message(self, cdr_encoder, message_definition, msg):
        if "primitiveMessageDefinition" in message_definition:
            primitive_type = message_definition["primitiveMessageDefinition"][
                "ros2PrimitiveMessageDefinition"
            ]["primitiveType"]
            if primitive_type in PRIMITIVE_TYPES:
                cdr_encoder.push_primitive(PRIMITIVE_TYPES[primitive_type], msg)
            elif primitive_type in STRING_TYPES:
                upper_bound = message_definition["primitiveMessageDefinition"][
                    "ros2PrimitiveMessageDefinition"
                ].get("upperBound", 0)
                cdr_encoder.push_string(STRING_TYPES[primitive_type], msg, upper_bound)
            else:
                raise Exception(f"unknown primitive type: {primitive_type}")
        elif "structuredMessageListDefinition" in message_definition:
            list_type = message_definition["structuredMessageListDefinition"]["listType"]
            if list_type in ["DYNAMIC_BOUNDED_CAPACITY", "FIXED_CAPACITY"]:
                capacity = message_definition["structuredMessageListDefinition"]["capacity"]
                if list_type == "DYNAMIC_BOUNDED_CAPACITY" and len(msg) > capacity:
                    raise Exception(f"length of list exceeds capacity {capacity}")
                if list_type == "FIXED_CAPACITY" and len(msg) != capacity:
                    raise Exception(f"length of list does not match capacity {capacity}")
            if list_type in ["DYNAMIC_UNBOUNDED_CAPACITY", "DYNAMIC_BOUNDED_CAPACITY"]:
                cdr_encoder.push_primitive(PRIMITIVE_TYPES["UINT32"], len(msg))
            member_type = self.get_member_type(message_definition)
            # Optimize primitive arrays:
            if not isinstance(member_type, dict) and member_type in PRIMITIVE_TYPES:
                cdr_encoder.push_primitive_array(PRIMITIVE_TYPES[member_type], msg)
            else:
                for element in msg:
                    self.encode_message(cdr_encoder, member_type, element)
        elif "structuredMessageDefinition" in message_definition:
            for field_definition in message_definition["structuredMessageDefinition"]:
                self.encode_message(
                    cdr_encoder, field_definition["dataType"], msg[field_definition["fieldName"]]
                )
        else:
            raise Exception(f"unknown message type: {message_definition}")