in greengrass-opcua-adapter-nodejs/subscriber.js [218:320]
monitorNodes() {
const self = this;
self._monitoredItemsConfig.forEach((monitoredNode) => {
console.log('monitoring node id = ', monitoredNode.id);
const monitoredItem = this._subscription.monitor(
{
nodeId: monitoredNode.id,
attributeId: Opcua.AttributeIds.Value,
},
{
samplingInterval: 250,
queueSize: 10000,
discardOldest: true,
},
Opcua.read_service.TimestampsToReturn.Both
);
monitoredItem.on('initialized', () => {
console.log('monitoredItem initialized');
});
monitoredItem.on('changed', (dataValue) => {
const monitoredNodeName = monitoredNode.displayName;
const serverName = self._serverConfig.name;
const time = dataValue.sourceTimestamp;
const nodeId = monitoredItem.itemToMonitor.nodeId.toString();
const payload = {
id: nodeId,
displayName: monitoredNodeName,
timestamp: time,
value: dataValue.value,
};
const awsServerName = serverName.replace(/\#|\?|\+/g,'');
const awsNodeName = monitoredNodeName.replace(/\#|\?|\+/g,'');
const topic = `/opcua/${awsServerName}/node/${awsNodeName}`;
const payloadStr = JSON.stringify(payload);
// Keep the received data into dict if enabling the custom strategy.
if (self._customConfig.customUploadDataStrategy.enableStrategy) {
if (self._customConfig.customUploadDataStrategy.enableAccumulativeData)
{
let dataExist = false;
for( const element of self._customConfig.customUploadDataStrategy.accumulativeWhiteList )
{
if (element === monitoredNodeName )
{
dataExist = true;
// Accumulate the data
if (!(monitoredNodeName in payloadDataMap))
{
payloadDataMap[monitoredNodeName] = dataValue.value.value+":"+Date.now();
}
else
{
// Check if it's array or not, if it's array, we just need to push it to array.
if(Array.isArray(payloadDataMap[monitoredNodeName]))
{
payloadDataMap[monitoredNodeName].push(dataValue.value.value+":"+Date.now());
}
// It's not a array, create an array to accumulate the data.
else
{
let existingData = [];
// Push the previous data into array (1st).
existingData.push(payloadDataMap[monitoredNodeName]);
// Push the current received data into array.
existingData.push(dataValue.value.value+":"+Date.now());
payloadDataMap[monitoredNodeName] = existingData;
}
}
console.dir(payloadDataMap);
break;
}
}
// Node name not in the accumulative white list, then just keep the latest data.
if (!dataExist)
{
payloadDataMap[monitoredNodeName] = dataValue.value.value;
}
}
else
{
payloadDataMap[monitoredNodeName] = dataValue.value.value;
}
console.dir(payloadDataMap);
} else {
IoTDevice.publish(
{
topic: topic,
payload: payloadStr,
},
(err) => {
if (err) {
console.log(`Failed to publish ${payloadStr} on ${topic}. Got the following error: ${err}`);
}
});
}
});
monitoredItem.on('err', (errorMessage) => {
console.log(monitoredItem.itemToMonitor.nodeId.toString(), ' ERROR', errorMessage);
});
});
}