in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java [199:238]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
DefaultPlcSubscriptionEvent event = (DefaultPlcSubscriptionEvent) getMessage(context);
if (event == null) {
return;
} else {
session.adjustCounter("Messages Received", 1L, false);
}
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile resultSetFF = session.create();
Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), Collections.emptyMap());
try {
session.write(resultSetFF, out -> {
try {
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), recordSchema, getTimestampField(context)));
} catch (Exception e) {
getLogger().error("Exception reading the data from PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
if (recordSchema == null){
addTagsToCache(event, plc4xWriter);
}
});
resultSetFF = completeResultFlowFile(session, nrOfRows, resultSetFF, plc4xWriter);
session.transfer(resultSetFF, REL_SUCCESS);
executeTime.start();
} catch (Exception e) {
getLogger().error("Got an error while trying to get a subscription event", e);
throw new ProcessException("Got an error while trying to get a subscription event", e);
}
}