def write()

in labgraph/loggers/hdf5/logger.py [0:0]


    def write(self, messages_by_logging_id: Mapping[str, Sequence[Message]]) -> None:
        with self.file_lock:
            if self.file is None:
                num_messages = sum(
                    len(messages) for messages in messages_by_logging_id.values()
                )
                logger.warn(f"dropping {num_messages} messages while stopping")
                return
            for logging_id, messages in messages_by_logging_id.items():
                hdf5_path = logging_id
                group_path = "/" + HDF5_PATH_DELIMITER.join(
                    hdf5_path.split(PATH_DELIMITER)[:-1]
                )
                group = self.file.require_group(group_path)

                dataset_dtype = np.dtype(
                    [
                        (field.name, *get_numpy_type_for_field_type(field.data_type))
                        for field in messages[0].__class__.__message_fields__.values()
                    ]
                )

                dataset_name = hdf5_path.split(PATH_DELIMITER)[-1]
                if dataset_name not in group:
                    dataset = group.create_dataset(
                        dataset_name,
                        shape=(len(messages),),
                        maxshape=(None,),
                        dtype=dataset_dtype,
                    )
                else:
                    dataset = group[dataset_name]
                    dataset.resize(len(dataset) + len(messages), 0)

                for i, message in enumerate(messages):
                    # Convert dynamic-length bytes fields into numpy arrays so h5py can
                    # read/write them
                    message_fields = list(message.astuple())
                    fields = list(message.__class__.__message_fields__.values())
                    for j, value in enumerate(message_fields):
                        if not isinstance(fields[j].data_type, DynamicType):
                            continue
                        if isinstance(fields[j].data_type, SERIALIZABLE_DYNAMIC_TYPES):
                            value = fields[j].data_type.preprocess(value)
                        if isinstance(value, bytes):
                            message_fields[j] = np.array(bytearray(value))
                        elif isinstance(value, bytearray):
                            message_fields[j] = np.array(value)

                    dataset[-len(messages) + i] = tuple(message_fields)

            self.file.flush()