in labgraph/messages/message.py [0:0]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__setattr__("__original_message__", None)
super().__setattr__("__original_message_type__", None)
if 1 <= len(kwargs) <= 2 and "__sample__" in kwargs.keys():
# Option to create a message directly from Cthulhu sample
# Bypasses frozen check due to `frozen=True` by calling `__setattr__` on
# `object`
if "__original_message_type__" in kwargs.keys():
super().__setattr__(
"__original_message_type__", kwargs["__original_message_type__"]
)
super().__setattr__("__sample__", kwargs["__sample__"])
return
# Otherwise, build up a Cthulhu sample using the provided arguments
cls = type(self)
# Build a dictionary of values
values: Dict[str, Any] = {}
# Add to the dictionary from the positional arguments
for i, arg in enumerate(args):
# Check that there are as many kwargs as non-internal fields on this message
# type
if len(values) == len(cls.__message_fields__):
raise TypeError(
f"__init__() takes {len(cls.__message_fields__)} positional "
f"arguments but {len(args)} were given"
)
values[list(cls.__message_fields__.keys())[i]] = arg
# Add to the dictionary from the keyword arguments
for key, value in kwargs.items():
if key in values:
raise TypeError(
f"__init__() for {cls.__name__} got multiple values for argument "
f"'{key}'"
)
elif key not in cls.__message_fields__:
raise TypeError(
f"__init__() for {cls.__name__} got an unexpected keyword argument "
f"'{key}'"
)
values[key] = value
# Ensure we have all required values, and fill in default values
for field in cls.__message_fields__.values():
if field.name not in values.keys():
if field.required:
raise TypeError(f"__init__() missing argument: '{field.name}'")
else:
values[field.name] = field.get_default_value()
# Validate all the values
for field in cls.__message_fields__.values():
value = values[field.name]
if not field.data_type.isinstance(value):
raise TypeError(
f"__init__() for {cls.__name__} got invalid value for argument "
f"'{field.name}': {value} (expected a "
f"{field.data_type.description})"
)
# Allocate shared memory for a Cthulhu sample
sample = StreamSample()
if cls.__message_size__ > 0:
sample.parameters = memoryPool().getBufferFromPool("", cls.__message_size__)
# Preprocess the fixed-length field values and put them in the correct sequence
# for serialization
fixed_values = [
field.data_type.preprocess(values[field.name])
for field in cls.__message_fields__.values()
if field.data_type.size is not None
]
# Serialize the fixed-length field values
fixed_bytes = struct.pack(cls.__format_string__, *fixed_values)
# Write the serialized fixed-length field values to shared memory
memoryview(sample.parameters)[
: cls.__message_size__
] = fixed_bytes # type: ignore
if cls.__num_dynamic_fields__ > 0:
# Preprocess the dynamic-length field values
dynamic_values = [
field.data_type.preprocess(values[field.name])
for field in cls.__message_fields__.values()
if field.data_type.size is None
]
# Allocate shared memory for the dynamic-length field values
dynamic_buffers = [
memoryPool().getBufferFromPool("", len(value))
for value in dynamic_values
]
# Write the serialized dynamic-length field values to shared memory
for value, buffer in zip(dynamic_values, dynamic_buffers):
memoryview(buffer)[: len(value)] = value
# Set the sample's dynamic parameters
if len(dynamic_buffers) > 0:
sample.dynamicParameters = dynamic_buffers
# Bypasses frozen check due to `frozen=True` by calling `__setattr__` on
# `object`
super().__setattr__("__sample__", sample)