monitorNodes()

in greengrass-opcua-adapter-nodejs/subscriber.js [218:320]


    monitorNodes() {
        const self = this;
        self._monitoredItemsConfig.forEach((monitoredNode) => {
            console.log('monitoring node id = ', monitoredNode.id);
            const monitoredItem = this._subscription.monitor(
                {
                    nodeId: monitoredNode.id,
                    attributeId: Opcua.AttributeIds.Value,
                },
                {
                    samplingInterval: 250,
                    queueSize: 10000,
                    discardOldest: true,
                },
                Opcua.read_service.TimestampsToReturn.Both
            );
            monitoredItem.on('initialized', () => {
                console.log('monitoredItem initialized');
            });
            monitoredItem.on('changed', (dataValue) => {
                const monitoredNodeName = monitoredNode.displayName;
                const serverName = self._serverConfig.name;
                const time = dataValue.sourceTimestamp;
                const nodeId = monitoredItem.itemToMonitor.nodeId.toString();
                const payload = {
                    id: nodeId,
                    displayName: monitoredNodeName,
                    timestamp: time,
                    value: dataValue.value,
                };
                const awsServerName = serverName.replace(/\#|\?|\+/g,'');
                const awsNodeName = monitoredNodeName.replace(/\#|\?|\+/g,'');
                const topic = `/opcua/${awsServerName}/node/${awsNodeName}`;
                const payloadStr = JSON.stringify(payload);

                // Keep the received data into dict if enabling the custom strategy.
                if (self._customConfig.customUploadDataStrategy.enableStrategy) {
                    if (self._customConfig.customUploadDataStrategy.enableAccumulativeData)
                    {
                        let dataExist = false;
                        for( const element of self._customConfig.customUploadDataStrategy.accumulativeWhiteList )
                        {
                            if (element === monitoredNodeName )
                            {
                                dataExist = true;
                                // Accumulate the data
                                if (!(monitoredNodeName in payloadDataMap))
                                {
                                    payloadDataMap[monitoredNodeName] = dataValue.value.value+":"+Date.now();
                                }
                                else
                                {
                                    // Check if it's array or not, if it's array, we just need to push it to array.
                                    if(Array.isArray(payloadDataMap[monitoredNodeName]))
                                    {
                                        payloadDataMap[monitoredNodeName].push(dataValue.value.value+":"+Date.now());
                                    }
                                    // It's not a array, create an array to accumulate the data.
                                    else
                                    {
                                        let existingData = [];
                                        // Push the previous data into array (1st).
                                        existingData.push(payloadDataMap[monitoredNodeName]);
                                        // Push the current received data into array.
                                        existingData.push(dataValue.value.value+":"+Date.now());
                                        payloadDataMap[monitoredNodeName] = existingData;
                                    }
                                }
                                console.dir(payloadDataMap);
                                break;
                            }
                        }

                        // Node name not in the accumulative white list, then just keep the latest data.
                        if (!dataExist)
                        {
                            payloadDataMap[monitoredNodeName] = dataValue.value.value;
                        }
                    }
                    else
                    {
                        payloadDataMap[monitoredNodeName] = dataValue.value.value;
                    }
                    console.dir(payloadDataMap);
                } else {
                    IoTDevice.publish(
                        {
                            topic: topic,
                            payload: payloadStr,
                        },
                        (err) => {
                            if (err) {
                               console.log(`Failed to publish ${payloadStr} on ${topic}. Got the following error: ${err}`);
                            }
                        });
                }
            });

            monitoredItem.on('err', (errorMessage) => {
                console.log(monitoredItem.itemToMonitor.nodeId.toString(), ' ERROR', errorMessage);
            });
        });
    }