in plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java [102:203]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = session.get();
// Abort if there's nothing to do.
if (fileToProcess == null) {
return;
}
final ComponentLog logger = getLogger();
// Get an instance of a component able to read from a PLC.
final AtomicLong nrOfRows = new AtomicLong(0L);
final StopWatch executeTime = new StopWatch(true);
try {
session.read(fileToProcess, in -> {
Record record = null;
try (RecordReader recordReader = context.getProperty(PLC_RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class)
.createRecordReader(fileToProcess, in, logger)){
while ((record = recordReader.nextRecord()) != null) {
AtomicLong nrOfRowsHere = new AtomicLong(0);
PlcWriteRequest writeRequest;
final Map<String,String> addressMap = getPlcAddressMap(context, fileToProcess);
final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, fileToProcess))) {
writeRequest = getWriteRequest(logger, addressMap, tags, record.toMap(), connection, nrOfRowsHere);
PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, fileToProcess), TimeUnit.MILLISECONDS);
// Response check if values were written
evaluateWriteResponse(logger, record.toMap(), plcWriteResponse);
// Store all configured tags in cache if not present
if (tags == null) {
if (debugEnabled)
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
// Parse plc addresses
Map<String, PlcTag> validAddressesPlcTags = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : addressMap.entrySet()) {
Optional<PlcTag> newTag = connection.parseTagAddress(entry.getValue());
newTag.ifPresent(parsed -> validAddressesPlcTags.put(entry.getKey(), parsed));
}
getSchemaCache().addSchema(
addressMap,
validAddressesPlcTags.keySet(),
new ArrayList<>(validAddressesPlcTags.values()),
null
);
}
} catch (TimeoutException e) {
logger.error("Timeout writting the data to the PLC", e);
getConnectionManager().removeCachedConnection(getConnectionString(context, fileToProcess));
throw new ProcessException(e);
} catch (PlcConnectionException e) {
logger.error("Error getting the PLC connection", e);
throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
} catch (Exception e) {
logger.error("Exception writting the data to the PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
nrOfRows.getAndAdd(nrOfRowsHere.get());
}
} catch (Exception e) {
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
});
} catch (ProcessException e) {
logger.error("Exception writing the data to the PLC", e);
session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
session.transfer(fileToProcess, REL_FAILURE);
session.commitAsync();
throw e;
}
long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributesToAdd.put(INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
session.putAllAttributes(fileToProcess, attributesToAdd);
session.transfer(fileToProcess, REL_SUCCESS);
logger.info("Writing {} fields from {} records; transferring to 'success'", nrOfRows.get(), fileToProcess);
if (context.hasIncomingConnection()) {
session.getProvenanceReporter().fetch(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
} else {
session.getProvenanceReporter().receive(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
}
}