in agora/contoso_motors/src/opc-simulator/app/frame_record_replay.py [0:0]
def read_frame(self, device: Device) -> UpdateList:
# pylint: disable=R0912
if not self.replay:
return []
dev_var_names = self.var_names[device.device_type.name]
num_vars_in_frame = self.read_int(2)
update_list = []
for _ in range(num_vars_in_frame):
var_idx = self.read_int(2, signed=False)
var_type = ua.VariantType(self.read_int(1, False))
var_good = self.read_int(1, False)
var_name = dev_var_names[var_idx]
if var_type == ua.VariantType.Boolean:
var_val = bool(self.read_int(1))
elif var_type == ua.VariantType.SByte:
var_val = self.read_int(1, True)
elif var_type == ua.VariantType.Byte:
var_val = self.read_int(1, False)
elif var_type == ua.VariantType.Int16:
var_val = self.read_int(2, True)
elif var_type == ua.VariantType.UInt16:
var_val = self.read_int(2, False)
elif var_type == ua.VariantType.Int32:
var_val = self.read_int(4, True)
elif var_type == ua.VariantType.UInt32:
var_val = self.read_int(4, False)
elif var_type == ua.VariantType.Int64:
var_val = self.read_int(8, True)
elif var_type == ua.VariantType.UInt64:
var_val = self.read_int(8, False)
elif var_type == ua.VariantType.Float:
var_val = struct.unpack(">f", self.replay_fp.read(4))[0]
elif var_type == ua.VariantType.Double:
var_val = struct.unpack(">d", self.replay_fp.read(8))[0]
elif var_type == ua.VariantType.String:
str_len = self.read_int(4, False)
var_val = self.replay_fp.read(str_len).decode("utf-8")
elif var_type == ua.VariantType.DateTime:
str_len = self.read_int(1, False)
var_val = datetime.datetime.fromisoformat(
self.replay_fp.read(str_len).decode("utf-8")
)
update_list.append((var_name, var_val, var_good))
if self.replay_fp.tell() == self.replay_data_file_size:
if not self.replay_in_a_loop:
self.replay_completed = True
else: # reset to the beginning of the file to replay in an infinite loop
self.record_fp.seek(0)
self.num_frames_read += 1
return update_list