def read_frame()

in agora/contoso_motors/src/opc-simulator/app/frame_record_replay.py [0:0]


    def read_frame(self, device: Device) -> UpdateList:
        # pylint: disable=R0912
        if not self.replay:
            return []

        dev_var_names = self.var_names[device.device_type.name]

        num_vars_in_frame = self.read_int(2)

        update_list = []
        for _ in range(num_vars_in_frame):
            var_idx = self.read_int(2, signed=False)
            var_type = ua.VariantType(self.read_int(1, False))
            var_good = self.read_int(1, False)
            var_name = dev_var_names[var_idx]

            if var_type == ua.VariantType.Boolean:
                var_val = bool(self.read_int(1))
            elif var_type == ua.VariantType.SByte:
                var_val = self.read_int(1, True)
            elif var_type == ua.VariantType.Byte:
                var_val = self.read_int(1, False)
            elif var_type == ua.VariantType.Int16:
                var_val = self.read_int(2, True)
            elif var_type == ua.VariantType.UInt16:
                var_val = self.read_int(2, False)
            elif var_type == ua.VariantType.Int32:
                var_val = self.read_int(4, True)
            elif var_type == ua.VariantType.UInt32:
                var_val = self.read_int(4, False)
            elif var_type == ua.VariantType.Int64:
                var_val = self.read_int(8, True)
            elif var_type == ua.VariantType.UInt64:
                var_val = self.read_int(8, False)
            elif var_type == ua.VariantType.Float:
                var_val = struct.unpack(">f", self.replay_fp.read(4))[0]
            elif var_type == ua.VariantType.Double:
                var_val = struct.unpack(">d", self.replay_fp.read(8))[0]
            elif var_type == ua.VariantType.String:
                str_len = self.read_int(4, False)
                var_val = self.replay_fp.read(str_len).decode("utf-8")
            elif var_type == ua.VariantType.DateTime:
                str_len = self.read_int(1, False)
                var_val = datetime.datetime.fromisoformat(
                    self.replay_fp.read(str_len).decode("utf-8")
                )
            update_list.append((var_name, var_val, var_good))

        if self.replay_fp.tell() == self.replay_data_file_size:
            if not self.replay_in_a_loop:
                self.replay_completed = True
            else:  # reset to the beginning of the file to replay in an infinite loop
                self.record_fp.seek(0)

        self.num_frames_read += 1
        return update_list