in labgraph/loggers/hdf5/logger.py [0:0]
def write(self, messages_by_logging_id: Mapping[str, Sequence[Message]]) -> None:
with self.file_lock:
if self.file is None:
num_messages = sum(
len(messages) for messages in messages_by_logging_id.values()
)
logger.warn(f"dropping {num_messages} messages while stopping")
return
for logging_id, messages in messages_by_logging_id.items():
hdf5_path = logging_id
group_path = "/" + HDF5_PATH_DELIMITER.join(
hdf5_path.split(PATH_DELIMITER)[:-1]
)
group = self.file.require_group(group_path)
dataset_dtype = np.dtype(
[
(field.name, *get_numpy_type_for_field_type(field.data_type))
for field in messages[0].__class__.__message_fields__.values()
]
)
dataset_name = hdf5_path.split(PATH_DELIMITER)[-1]
if dataset_name not in group:
dataset = group.create_dataset(
dataset_name,
shape=(len(messages),),
maxshape=(None,),
dtype=dataset_dtype,
)
else:
dataset = group[dataset_name]
dataset.resize(len(dataset) + len(messages), 0)
for i, message in enumerate(messages):
# Convert dynamic-length bytes fields into numpy arrays so h5py can
# read/write them
message_fields = list(message.astuple())
fields = list(message.__class__.__message_fields__.values())
for j, value in enumerate(message_fields):
if not isinstance(fields[j].data_type, DynamicType):
continue
if isinstance(fields[j].data_type, SERIALIZABLE_DYNAMIC_TYPES):
value = fields[j].data_type.preprocess(value)
if isinstance(value, bytes):
message_fields[j] = np.array(bytearray(value))
elif isinstance(value, bytearray):
message_fields[j] = np.array(value)
dataset[-len(messages) + i] = tuple(message_fields)
self.file.flush()