in agora/contoso_motors/src/opc-simulator/app/plc_server_controller.py [0:0]
def __init__(self, config: Dict[str, Any]):
plc_conf = config.get("PLCSettings", {})
ip_address = plc_conf["IPAddress"]
device_type_name = plc_conf.get("DeviceTypeName", "RockwellPLC")
device_instance_name = plc_conf.get("DeviceInstanceName", "FluidDemoPLC")
logging.info("Connecting to PLC at %s ...", ip_address)
self.plc = PLC(ip_address)
res = self.plc.GetDeviceProperties()
dev_props = res.Value
logging.info("Retrieving PLC tags ...")
variables = self.query_plc_tags(plc_conf)
config.update(
{
"DeviceTypes": [
{
"DeviceType": device_type_name,
"Variables": variables,
"Properties": {},
}
],
"Devices": [
{
"Name": device_instance_name,
"DeviceType": device_type_name,
"UseStringNodeIds": plc_conf.get("UseStringNodeIds", False),
"Properties": [
{
"Name": "SerialNumber",
"Type": "String",
"Value": dev_props.SerialNumber,
},
{
"Name": "HardwareRevision",
"Value": dev_props.Revision,
},
{
"Name": "SoftwareRevision",
"Value": dev_props.Revision,
},
{
"Name": "Manufacturer",
"Type": "LocalizedText",
"Value": {
"Text": dev_props.Vendor,
"Locale": "en_US",
},
},
{
"Name": "Model",
"Type": "LocalizedText",
"Value": {
"Text": dev_props.ProductName,
"Locale": "en_US",
},
},
],
}
],
}
)
self.tags_being_written = set()
self.subscription = None
super().__init__(config)
self.cached_values: Dict[str, Any] = {}