tools/cloud/vision_system_data_serializer.py (257 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import struct
# Map from vision-system-data to Python struct primitive types:
PRIMITIVE_TYPES = {
"UINT8": "B",
"BYTE": "B",
"CHAR": "B",
"BOOL": "B",
"UINT16": "H",
"UINT32": "I",
"UINT64": "Q",
"INT8": "b",
"INT16": "h",
"INT32": "i",
"INT64": "q",
"FLOAT32": "f",
"FLOAT64": "d",
}
STRING_TYPES = {
"STRING": "utf-8",
"WSTRING": "utf-16",
}
class CdrDecoder:
def __init__(self, buffer):
self.buffer = buffer
self.current_position = 0
self.align_position = 0
self.last_data_size = 0
def reset_alignment(self):
self.align_position = self.current_position
def read_encapsulation(self):
dummy = self.pop_primitive(PRIMITIVE_TYPES["UINT8"])
if dummy != 0:
raise Exception(f"unexpected non-zero initial byte: {dummy}")
encapsulation_kind = self.pop_primitive(PRIMITIVE_TYPES["UINT8"])
if encapsulation_kind != 1:
raise Exception(f"unexpected encapsulation kind: {encapsulation_kind}")
options = self.pop_primitive(PRIMITIVE_TYPES["UINT16"])
if options != 0:
raise Exception(f"unexpected options: {options}")
self.reset_alignment()
def get_alignment(self, data_size):
return (
0
if (data_size <= self.last_data_size)
else (
(data_size - ((self.current_position - self.align_position) % data_size))
& (data_size - 1)
)
)
def realign(self, alignment):
self.current_position += alignment
self.buffer = self.buffer[alignment:]
def pop_primitive(self, struct_type):
dummy = struct.pack(struct_type, 0)
alignment = self.get_alignment(len(dummy))
aligned_size = len(dummy) + alignment
if len(self.buffer) < aligned_size:
raise Exception("out of data")
self.last_data_size = len(dummy)
self.realign(alignment)
raw_bytes = self.buffer[0 : len(dummy)]
self.buffer = self.buffer[len(dummy) :]
self.current_position += len(dummy)
return struct.unpack(struct_type, raw_bytes)[0]
def pop_primitive_array(self, struct_type, length):
dummy = struct.pack(struct_type, 0)
alignment = self.get_alignment(len(dummy))
aligned_size = len(dummy) * length + alignment
if len(self.buffer) < aligned_size:
raise Exception("out of data")
self.last_data_size = len(dummy)
self.realign(alignment)
raw_bytes = self.buffer[0 : len(dummy) * length]
self.buffer = self.buffer[len(dummy) * length :]
self.current_position += len(dummy) * length
return list(struct.unpack(str(length) + struct_type, raw_bytes))
def pop_string(self, encoding):
length = self.pop_primitive(PRIMITIVE_TYPES["UINT32"])
code_unit_size = 1 if encoding == "utf-8" else 4
if len(self.buffer) < length * code_unit_size:
raise Exception("out of data")
raw_bytes = self.buffer[0 : length * code_unit_size]
self.buffer = self.buffer[length * code_unit_size :]
self.last_data_size = code_unit_size
self.current_position += len(raw_bytes)
# utf-16 mode stores the 16-bit utf-16 'code units' in 32-bit words:
if encoding == "utf-16":
utf16_bytes = raw_bytes
raw_bytes = bytearray()
while len(utf16_bytes) > 0:
raw_bytes += utf16_bytes[:2]
utf16_bytes = utf16_bytes[4:]
# utf-8 mode has a null-terminator:
return raw_bytes.decode(encoding).rstrip("\0")
class CdrEncoder:
def __init__(self):
self.buffer = bytearray()
self.current_position = 0
self.align_position = 0
self.last_data_size = 0
def reset_alignment(self):
self.align_position = self.current_position
def write_encapsulation(self):
self.push_primitive(PRIMITIVE_TYPES["UINT8"], 0) # Dummy byte
self.push_primitive(PRIMITIVE_TYPES["UINT8"], 1) # Encapsulation kind
self.push_primitive(PRIMITIVE_TYPES["UINT16"], 0) # Options
self.reset_alignment()
def get_alignment(self, data_size):
return (
0
if (data_size <= self.last_data_size)
else (
(data_size - ((self.current_position - self.align_position) % data_size))
& (data_size - 1)
)
)
def realign(self, alignment):
self.current_position += alignment
for _ in range(alignment):
self.buffer.append(0)
def push_primitive(self, struct_type, val):
raw_bytes = struct.pack(struct_type, val)
alignment = self.get_alignment(len(raw_bytes))
self.last_data_size = len(raw_bytes)
self.realign(alignment)
self.buffer += raw_bytes
self.current_position += len(raw_bytes)
def push_primitive_array(self, struct_type, val):
dummy = struct.pack(struct_type, 0)
alignment = self.get_alignment(len(dummy))
self.last_data_size = len(dummy)
self.realign(alignment)
raw_bytes = struct.pack(str(len(val)) + struct_type, *val)
self.buffer += raw_bytes
self.current_position += len(raw_bytes)
def push_string(self, encoding, val, upper_bound):
if upper_bound > 0 and len(val) > upper_bound:
raise Exception(f'length of string "{val}" exceeds upper bound {upper_bound}')
if encoding == "utf-8":
code_unit_size = 1
# utf-8 mode has a null-terminator:
val += "\0"
else:
code_unit_size = 2
raw_bytes = val.encode(encoding)
self.push_primitive(PRIMITIVE_TYPES["UINT32"], int(len(raw_bytes) / code_unit_size))
# utf-16 mode stores the 16-bit utf-16 'code units' in 32-bit words:
if encoding == "utf-16":
code_unit_size = 4
utf16_bytes = raw_bytes
raw_bytes = bytearray()
while len(utf16_bytes) > 0:
raw_bytes += utf16_bytes[:2]
raw_bytes += bytearray([0, 0])
utf16_bytes = utf16_bytes[2:]
self.buffer += raw_bytes
self.last_data_size = code_unit_size
self.current_position += len(raw_bytes)
class VisionSystemDataSerializer:
def __init__(self, decoder_manifest):
self.decoder_manifest = decoder_manifest
def get_signal_definition(self, fully_qualified_name):
for signal_definition in self.decoder_manifest:
if signal_definition["fullyQualifiedName"] == fully_qualified_name:
return signal_definition
raise Exception(f"unknown message: {fully_qualified_name}")
def serialize(self, fully_qualified_name, msg):
signal_definition = self.get_signal_definition(fully_qualified_name)
cdr_encoder = CdrEncoder()
cdr_encoder.write_encapsulation()
self.encode_message(
cdr_encoder, signal_definition["messageSignal"]["structuredMessage"], msg
)
return bytes(cdr_encoder.buffer)
def get_member_type(self, message_definition):
member_type = message_definition["structuredMessageListDefinition"]["memberType"]
if "primitiveMessageDefinition" not in member_type:
return member_type
primitive_type = member_type["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
]["primitiveType"]
if primitive_type not in PRIMITIVE_TYPES:
return member_type
return primitive_type
def encode_message(self, cdr_encoder, message_definition, msg):
if "primitiveMessageDefinition" in message_definition:
primitive_type = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
]["primitiveType"]
if primitive_type in PRIMITIVE_TYPES:
cdr_encoder.push_primitive(PRIMITIVE_TYPES[primitive_type], msg)
elif primitive_type in STRING_TYPES:
upper_bound = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
].get("upperBound", 0)
cdr_encoder.push_string(STRING_TYPES[primitive_type], msg, upper_bound)
else:
raise Exception(f"unknown primitive type: {primitive_type}")
elif "structuredMessageListDefinition" in message_definition:
list_type = message_definition["structuredMessageListDefinition"]["listType"]
if list_type in ["DYNAMIC_BOUNDED_CAPACITY", "FIXED_CAPACITY"]:
capacity = message_definition["structuredMessageListDefinition"]["capacity"]
if list_type == "DYNAMIC_BOUNDED_CAPACITY" and len(msg) > capacity:
raise Exception(f"length of list exceeds capacity {capacity}")
if list_type == "FIXED_CAPACITY" and len(msg) != capacity:
raise Exception(f"length of list does not match capacity {capacity}")
if list_type in ["DYNAMIC_UNBOUNDED_CAPACITY", "DYNAMIC_BOUNDED_CAPACITY"]:
cdr_encoder.push_primitive(PRIMITIVE_TYPES["UINT32"], len(msg))
member_type = self.get_member_type(message_definition)
# Optimize primitive arrays:
if not isinstance(member_type, dict) and member_type in PRIMITIVE_TYPES:
cdr_encoder.push_primitive_array(PRIMITIVE_TYPES[member_type], msg)
else:
for element in msg:
self.encode_message(cdr_encoder, member_type, element)
elif "structuredMessageDefinition" in message_definition:
for field_definition in message_definition["structuredMessageDefinition"]:
self.encode_message(
cdr_encoder, field_definition["dataType"], msg[field_definition["fieldName"]]
)
else:
raise Exception(f"unknown message type: {message_definition}")
def deserialize(self, fully_qualified_name, cdr_data):
signal_definition = self.get_signal_definition(fully_qualified_name)
cdr_decoder = CdrDecoder(cdr_data)
cdr_decoder.read_encapsulation()
return self.decode_message(
cdr_decoder, signal_definition["messageSignal"]["structuredMessage"]
)
def decode_message(self, cdr_decoder, message_definition):
if "primitiveMessageDefinition" in message_definition:
primitive_type = message_definition["primitiveMessageDefinition"][
"ros2PrimitiveMessageDefinition"
]["primitiveType"]
if primitive_type in PRIMITIVE_TYPES:
return cdr_decoder.pop_primitive(PRIMITIVE_TYPES[primitive_type])
elif primitive_type in STRING_TYPES:
return cdr_decoder.pop_string(STRING_TYPES[primitive_type])
else:
raise Exception(f"unknown primitive type: {primitive_type}")
elif "structuredMessageListDefinition" in message_definition:
list_type = message_definition["structuredMessageListDefinition"]["listType"]
if list_type in ["DYNAMIC_UNBOUNDED_CAPACITY", "DYNAMIC_BOUNDED_CAPACITY"]:
list_length = cdr_decoder.pop_primitive(PRIMITIVE_TYPES["UINT32"])
else:
list_length = message_definition["structuredMessageListDefinition"]["capacity"]
member_type = self.get_member_type(message_definition)
# Optimize primitive arrays:
if not isinstance(member_type, dict) and member_type in PRIMITIVE_TYPES:
return cdr_decoder.pop_primitive_array(PRIMITIVE_TYPES[member_type], list_length)
else:
result = []
for _ in range(list_length):
result.append(self.decode_message(cdr_decoder, member_type))
return result
elif "structuredMessageDefinition" in message_definition:
result = {}
for field_definition in message_definition["structuredMessageDefinition"]:
result[field_definition["fieldName"]] = self.decode_message(
cdr_decoder, field_definition["dataType"]
)
return result
else:
raise Exception(f"unknown message type: {message_definition}")